You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/07/21 05:57:21 UTC

[dubbo] branch 3.0 updated (80b22d6 -> 3124dd8)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a change to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git.


    from 80b22d6  Service Discovery Enhancement
     new 838dfd6  metadata report status
     new 3d234d2  service discovery
     new 42f0529  can basically work with InstanceAddressURL
     new 3124dd8  set metadata proxy timeout

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org.apache.dubbo.rpc.cluster.RouterFactory     |   1 -
 .../rpc/cluster/directory/MockDirInvocation.java   |   5 +
 .../router/condition/ConditionRouterTest.java      |   2 +-
 .../support/AbstractClusterInvokerTest.java        |   6 +-
 .../org/apache/dubbo/common/ConfigurationURL.java  |   5 +-
 .../src/main/java/org/apache/dubbo/common/URL.java | 302 +++++++++++++++++++--
 .../dubbo/common/config/ConfigurationUtils.java    |   4 +
 .../dubbo/config/bootstrap/DubboBootstrap.java     |  58 ++--
 .../org/apache/dubbo/config/cache/CacheTest.java   |   2 +-
 .../apache/dubbo/metadata/MetadataConstants.java   |   4 +
 .../org/apache/dubbo/metadata/MetadataInfo.java    | 178 ++++++++++--
 .../dubbo/monitor/support/MonitorFilterTest.java   |   8 +-
 .../dubbo/qos/command/impl/PublishMetadata.java    |  63 +++++
 .../org.apache.dubbo.qos.command.BaseCommand       |   1 +
 .../registry/client/DefaultServiceInstance.java    |  35 ++-
 .../client/EventPublishingServiceDiscovery.java    |   5 +
 .../client/FileSystemServiceDiscovery.java         |   8 +
 .../dubbo/registry/client/InstanceAddressURL.java  | 270 +++++++++++++++---
 .../dubbo/registry/client/ServiceDiscovery.java    |   2 +
 .../registry/client/ServiceDiscoveryRegistry.java  |   4 +-
 .../client/ServiceDiscoveryRegistryDirectory.java  | 137 ++++++----
 .../dubbo/registry/client/ServiceInstance.java     |   2 +
 .../listener/ServiceInstancesChangedListener.java  |  28 +-
 .../registry/client/metadata/MetadataUtils.java    |   3 +-
 .../metadata/ServiceInstanceMetadataUtils.java     |  55 ++--
 .../StandardMetadataServiceURLBuilder.java         |   7 +-
 .../store/InMemoryWritableMetadataService.java     |  12 +
 .../metadata/store/RemoteMetadataServiceImpl.java  |  20 +-
 .../registry/integration/DynamicDirectory.java     |   9 +-
 .../registry/integration/RegistryDirectory.java    |   2 +-
 .../registry/support/AbstractRegistryFactory.java  |  12 +
 .../registry/client/InMemoryServiceDiscovery.java  |   8 +
 .../zookeeper/ZookeeperServiceDiscovery.java       |  18 +-
 .../apache/dubbo/remoting/exchange/Exchangers.java |   2 +-
 .../dubbo/remoting/transport/AbstractEndpoint.java |   2 +-
 .../main/java/org/apache/dubbo/rpc/Invocation.java |   2 +
 .../main/java/org/apache/dubbo/rpc/RpcContext.java |  66 ++---
 .../java/org/apache/dubbo/rpc/RpcInvocation.java   |  32 ++-
 .../org/apache/dubbo/rpc/filter/GenericFilter.java |   2 +-
 .../apache/dubbo/rpc/protocol/AbstractInvoker.java |   2 +-
 .../dubbo/rpc/proxy/AbstractProxyInvoker.java      |   2 +-
 .../dubbo/rpc/proxy/InvokerInvocationHandler.java  |  10 +-
 .../dubbo/rpc/filter/ExceptionFilterTest.java      |   8 +-
 .../apache/dubbo/rpc/filter/GenericFilterTest.java |   8 +-
 .../dubbo/rpc/filter/GenericImplFilterTest.java    |   8 +-
 .../apache/dubbo/rpc/proxy/AbstractProxyTest.java  |   4 +-
 .../apache/dubbo/rpc/support/MockInvocation.java   |   5 +
 .../org/apache/dubbo/rpc/support/RpcUtilsTest.java |  48 ++--
 .../rpc/protocol/dubbo/CallbackServiceCodec.java   |  10 +-
 .../dubbo/rpc/protocol/dubbo/DubboProtocol.java    |   2 +-
 .../dubbo/ReferenceCountExchangeClient.java        |   9 +-
 .../dubbo/rpc/protocol/thrift/ThriftProtocol.java  |   2 +-
 52 files changed, 1147 insertions(+), 353 deletions(-)
 copy dubbo-rpc/dubbo-rpc-grpc/src/main/java/org/apache/dubbo/rpc/protocol/grpc/GrpcConfig.java => dubbo-common/src/main/java/org/apache/dubbo/common/ConfigurationURL.java (91%)
 create mode 100644 dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/command/impl/PublishMetadata.java


[dubbo] 02/04: service discovery

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit 3d234d29fcd7dc67ffbd542edd7053d68f35c555
Author: ken.lj <ke...@gmail.com>
AuthorDate: Thu Jul 9 18:01:19 2020 +0800

    service discovery
---
 .../org.apache.dubbo.rpc.cluster.RouterFactory     |  1 -
 .../dubbo/common/config/ConfigurationUtils.java    |  4 ++
 .../dubbo/config/bootstrap/DubboBootstrap.java     | 58 ++++++++------------
 .../apache/dubbo/metadata/MetadataConstants.java   |  2 +
 .../org/apache/dubbo/metadata/MetadataInfo.java    |  7 +++
 .../dubbo/qos/command/impl/PublishMetadata.java    | 63 ++++++++++++++++++++++
 .../org.apache.dubbo.qos.command.BaseCommand       |  1 +
 .../client/EventPublishingServiceDiscovery.java    |  5 ++
 .../client/FileSystemServiceDiscovery.java         |  8 +++
 .../dubbo/registry/client/ServiceDiscovery.java    |  2 +
 .../client/ServiceDiscoveryRegistryDirectory.java  |  2 +-
 .../listener/ServiceInstancesChangedListener.java  | 14 +++--
 .../metadata/ServiceInstanceMetadataUtils.java     | 37 +++++++++++++
 .../store/InMemoryWritableMetadataService.java     | 12 +++++
 .../metadata/store/RemoteMetadataServiceImpl.java  |  6 +--
 .../registry/support/AbstractRegistryFactory.java  | 12 +++++
 .../registry/client/InMemoryServiceDiscovery.java  |  8 +++
 .../zookeeper/ZookeeperServiceDiscovery.java       | 18 +++++--
 18 files changed, 211 insertions(+), 49 deletions(-)

diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory
index 13e307b..2a807f0 100644
--- a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory
+++ b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory
@@ -5,4 +5,3 @@ service=org.apache.dubbo.rpc.cluster.router.condition.config.ServiceRouterFactor
 app=org.apache.dubbo.rpc.cluster.router.condition.config.AppRouterFactory
 tag=org.apache.dubbo.rpc.cluster.router.tag.TagRouterFactory
 mock=org.apache.dubbo.rpc.cluster.router.mock.MockRouterFactory
-instance=org.apache.dubbo.rpc.cluster.router.service.InstanceRouterFactory
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/config/ConfigurationUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/config/ConfigurationUtils.java
index 70fd9f1..a7c0693 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/config/ConfigurationUtils.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/config/ConfigurationUtils.java
@@ -100,6 +100,10 @@ public class ConfigurationUtils {
         return StringUtils.trim(getGlobalConfiguration().getString(property, defaultValue));
     }
 
+    public static int get(String property, int defaultValue) {
+        return getGlobalConfiguration().getInt(property, defaultValue);
+    }
+
     public static Map<String, String> parseProperties(String content) throws IOException {
         Map<String, String> map = new HashMap<>();
         if (StringUtils.isEmpty(content)) {
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
index 455535c6..a9f2bcf 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.config.bootstrap;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.config.Environment;
 import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
 import org.apache.dubbo.common.config.configcenter.wrapper.CompositeDynamicConfiguration;
@@ -58,17 +59,16 @@ import org.apache.dubbo.config.utils.ReferenceConfigCache;
 import org.apache.dubbo.event.EventDispatcher;
 import org.apache.dubbo.event.EventListener;
 import org.apache.dubbo.event.GenericEventListener;
-import org.apache.dubbo.metadata.MetadataInfo;
 import org.apache.dubbo.metadata.MetadataService;
 import org.apache.dubbo.metadata.MetadataServiceExporter;
 import org.apache.dubbo.metadata.WritableMetadataService;
 import org.apache.dubbo.metadata.report.MetadataReportInstance;
 import org.apache.dubbo.registry.client.DefaultServiceInstance;
-import org.apache.dubbo.registry.client.ServiceDiscovery;
-import org.apache.dubbo.registry.client.ServiceDiscoveryRegistry;
 import org.apache.dubbo.registry.client.ServiceInstance;
 import org.apache.dubbo.registry.client.ServiceInstanceCustomizer;
 import org.apache.dubbo.registry.client.metadata.MetadataUtils;
+import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
+import org.apache.dubbo.registry.client.metadata.store.InMemoryWritableMetadataService;
 import org.apache.dubbo.registry.client.metadata.store.RemoteMetadataServiceImpl;
 import org.apache.dubbo.registry.support.AbstractRegistryFactory;
 import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -89,7 +89,6 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
-import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
@@ -99,11 +98,13 @@ import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_METADATA
 import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE;
 import static org.apache.dubbo.common.function.ThrowableAction.execute;
 import static org.apache.dubbo.common.utils.StringUtils.isNotEmpty;
+import static org.apache.dubbo.metadata.MetadataConstants.DEFAULT_METADATA_PUBLISH_DELAY;
+import static org.apache.dubbo.metadata.MetadataConstants.METADATA_PUBLISH_DELAY_KEY;
 import static org.apache.dubbo.metadata.WritableMetadataService.getDefaultExtension;
-import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.EXPORTED_SERVICES_REVISION_PROPERTY_NAME;
+import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.calInstanceRevision;
 import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.setMetadataStorageType;
+import static org.apache.dubbo.registry.support.AbstractRegistryFactory.getServiceDiscoveries;
 import static org.apache.dubbo.remoting.Constants.CLIENT_KEY;
-import static org.apache.dubbo.rpc.Constants.ID_KEY;
 
 /**
  * See {@link ApplicationModel} and {@link ExtensionLoader} for why this class is designed to be singleton.
@@ -734,15 +735,6 @@ public class DubboBootstrap extends GenericEventListener {
         addEventListener(this);
     }
 
-    private List<ServiceDiscovery> getServiceDiscoveries() {
-        return AbstractRegistryFactory.getRegistries()
-                .stream()
-                .filter(registry -> registry instanceof ServiceDiscoveryRegistry)
-                .map(registry -> (ServiceDiscoveryRegistry) registry)
-                .map(ServiceDiscoveryRegistry::getServiceDiscovery)
-                .collect(Collectors.toList());
-    }
-
     /**
      * Start the bootstrap
      */
@@ -1024,6 +1016,18 @@ public class DubboBootstrap extends GenericEventListener {
 
         ServiceInstance serviceInstance = createServiceInstance(serviceName, host, port);
 
+        doRegisterServiceInstance(serviceInstance);
+
+        // scheduled task for updating Metadata and ServiceInstance
+        executorRepository.nextScheduledExecutor().scheduleAtFixedRate(() -> {
+            InMemoryWritableMetadataService localMetadataService = (InMemoryWritableMetadataService) WritableMetadataService.getDefaultExtension();
+            localMetadataService.blockUntilUpdated();
+            ServiceInstanceMetadataUtils.refreshMetadataAndInstance();
+        }, 0, ConfigurationUtils.get(METADATA_PUBLISH_DELAY_KEY, DEFAULT_METADATA_PUBLISH_DELAY), TimeUnit.MICROSECONDS);
+    }
+
+    private void doRegisterServiceInstance(ServiceInstance serviceInstance) {
+        //FIXME
         publishMetadataToRemote(serviceInstance);
 
         getServiceDiscoveries().forEach(serviceDiscovery ->
@@ -1032,31 +1036,13 @@ public class DubboBootstrap extends GenericEventListener {
             // register metadata
             serviceDiscovery.register(serviceInstance);
         });
-
-        // scheduled task for updating Metadata and ServiceInstance
-        executorRepository.nextScheduledExecutor().scheduleAtFixedRate(() -> {
-            publishMetadataToRemote(serviceInstance);
-
-            getServiceDiscoveries().forEach(serviceDiscovery ->
-            {
-                calInstanceRevision(serviceDiscovery, serviceInstance);
-                // register metadata
-                serviceDiscovery.register(serviceInstance);
-            });
-        }, 0, 5000, TimeUnit.MICROSECONDS);
     }
 
     private void publishMetadataToRemote(ServiceInstance serviceInstance) {
+//        InMemoryWritableMetadataService localMetadataService = (InMemoryWritableMetadataService)WritableMetadataService.getDefaultExtension();
+//        localMetadataService.blockUntilUpdated();
         RemoteMetadataServiceImpl remoteMetadataService = MetadataUtils.getRemoteMetadataService();
-        remoteMetadataService.publishMetadata(serviceInstance);
-    }
-
-    private void calInstanceRevision(ServiceDiscovery serviceDiscovery, ServiceInstance instance) {
-        String registryCluster = serviceDiscovery.getUrl().getParameter(ID_KEY);
-        MetadataInfo metadataInfo = WritableMetadataService.getDefaultExtension().getMetadataInfos().get(registryCluster);
-        if (metadataInfo != null) {
-            instance.getMetadata().put(EXPORTED_SERVICES_REVISION_PROPERTY_NAME, metadataInfo.getRevision());
-        }
+        remoteMetadataService.publishMetadata(serviceInstance.getServiceName());
     }
 
     private URL selectMetadataServiceExportedURL() {
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java
index e03ddd6..7ba0a43 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java
@@ -23,4 +23,6 @@ public class MetadataConstants {
     public static final String META_DATA_STORE_TAG = ".metaData";
     public static final String SERVICE_META_DATA_STORE_TAG = ".smd";
     public static final String CONSUMER_META_DATA_STORE_TAG = ".cmd";
+    public static final String METADATA_PUBLISH_DELAY_KEY = "dubbo.application.metadata.delay";
+    public static final int DEFAULT_METADATA_PUBLISH_DELAY = 5000;
 }
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index 73e736f..4db0298 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.compiler.support.ClassUtils;
 import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.utils.ArrayUtils;
+import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.StringUtils;
 
 import java.io.Serializable;
@@ -39,6 +40,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
 
 public class MetadataInfo implements Serializable {
+    public static String DEFAULT_REVISION = "0";
     private String app;
     private String revision;
     private Map<String, ServiceInfo> services;
@@ -85,6 +87,11 @@ public class MetadataInfo implements Serializable {
         if (revision != null && hasReported()) {
             return revision;
         }
+
+        if (CollectionUtils.isEmptyMap(services)) {
+            return DEFAULT_REVISION;
+        }
+
         StringBuilder sb = new StringBuilder();
         sb.append(app);
         for (Map.Entry<String, ServiceInfo> entry : services.entrySet()) {
diff --git a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/command/impl/PublishMetadata.java b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/command/impl/PublishMetadata.java
new file mode 100644
index 0000000..990854c
--- /dev/null
+++ b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/command/impl/PublishMetadata.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.qos.command.impl;
+
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.utils.ArrayUtils;
+import org.apache.dubbo.qos.command.BaseCommand;
+import org.apache.dubbo.qos.command.CommandContext;
+import org.apache.dubbo.qos.command.annotation.Cmd;
+import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+@Cmd(name = "publishMetadata", summary = "update service metadata and service instance", example = {
+        "publishMetadata",
+        "publishMetadata 5"
+})
+public class PublishMetadata implements BaseCommand {
+    private static final Logger logger = LoggerFactory.getLogger(PublishMetadata.class);
+    private final ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
+    private ScheduledFuture future;
+
+    @Override
+    public String execute(CommandContext commandContext, String[] args) {
+        logger.info("received publishMetadata command.");
+
+        if (ArrayUtils.isEmpty(args)) {
+            ServiceInstanceMetadataUtils.refreshMetadataAndInstance();
+            return "publish metadata succeeded.";
+        }
+
+        try {
+            int delay = Integer.parseInt(args[0]);
+            if (future == null || future.isDone() || future.isCancelled()) {
+                future = executorRepository.nextScheduledExecutor()
+                        .scheduleWithFixedDelay(ServiceInstanceMetadataUtils::refreshMetadataAndInstance, 0, delay, TimeUnit.MILLISECONDS);
+            }
+        } catch (NumberFormatException e) {
+            logger.error("Wrong delay param", e);
+            return "publishMetadata failed! Wrong delay param!";
+        }
+        return "publish task submitted, will publish in " + args[0] + " seconds.";
+    }
+
+}
diff --git a/dubbo-plugin/dubbo-qos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.qos.command.BaseCommand b/dubbo-plugin/dubbo-qos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.qos.command.BaseCommand
index cb6e9a7..b92b6b2 100644
--- a/dubbo-plugin/dubbo-qos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.qos.command.BaseCommand
+++ b/dubbo-plugin/dubbo-qos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.qos.command.BaseCommand
@@ -5,3 +5,4 @@ ls=org.apache.dubbo.qos.command.impl.Ls
 offline=org.apache.dubbo.qos.command.impl.Offline
 ready=org.apache.dubbo.qos.command.impl.Ready
 version=org.apache.dubbo.qos.command.impl.Version
+publish-metadata=org.apache.dubbo.qos.command.impl.PublishMetadata
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java
index b5517b7..ee99000 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java
@@ -229,6 +229,11 @@ final class EventPublishingServiceDiscovery implements ServiceDiscovery {
     }
 
     @Override
+    public ServiceInstance getLocalInstance() {
+        return serviceDiscovery.getLocalInstance();
+    }
+
+    @Override
     public void initialize(URL registryURL) {
 
         assertInitialized(INITIALIZE_ACTION);
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java
index ba8d7d3..2a51168 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java
@@ -62,6 +62,8 @@ public class FileSystemServiceDiscovery implements ServiceDiscovery, EventListen
 
     private FileSystemDynamicConfiguration dynamicConfiguration;
 
+    private ServiceInstance serviceInstance;
+
     @Override
     public void onEvent(ServiceInstancesChangedEvent event) {
 
@@ -134,7 +136,13 @@ public class FileSystemServiceDiscovery implements ServiceDiscovery, EventListen
     }
 
     @Override
+    public ServiceInstance getLocalInstance() {
+        return serviceInstance;
+    }
+
+    @Override
     public void register(ServiceInstance serviceInstance) throws RuntimeException {
+        this.serviceInstance = serviceInstance;
         String serviceInstanceId = getServiceInstanceId(serviceInstance);
         String serviceName = getServiceName(serviceInstance);
         String content = toJSONString(serviceInstance);
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
index f9e667b..9800c35 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
@@ -267,6 +267,8 @@ public interface ServiceDiscovery extends Prioritized {
         return null;
     }
 
+    ServiceInstance getLocalInstance();
+
     /**
      * A human-readable description of the implementation
      *
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
index 9890988..3fd7713 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
@@ -64,7 +64,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
     }
 
     private void refreshInvoker(List<URL> invokerUrls) {
-        Assert.notNull(invokerUrls, "invokerUrls should not be null");
+        Assert.notNull(invokerUrls, "invokerUrls should not be null, use empty:// to clear address.");
 
         if (invokerUrls.size() == 1
                 && invokerUrls.get(0) != null
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 1901e34..8af48a3 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -45,6 +45,7 @@ import java.util.TreeSet;
 
 import static org.apache.dubbo.common.constants.CommonConstants.REGISTER_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE;
+import static org.apache.dubbo.metadata.MetadataInfo.DEFAULT_REVISION;
 import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getExportedServicesRevision;
 
 /**
@@ -86,11 +87,17 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
         String appName = event.getServiceName();
         allInstances.put(appName, event.getServiceInstances());
 
+        Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>();
+        Map<String, Set<String>> localServiceToRevisions = new HashMap<>();
+        Map<Set<String>, List<URL>> revisionsToUrls = new HashMap();
         for (Map.Entry<String, List<ServiceInstance>> entry : allInstances.entrySet()) {
             List<ServiceInstance> instances = entry.getValue();
             for (ServiceInstance instance : instances) {
                 String revision = getExportedServicesRevision(instance);
-                Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>();
+                if (DEFAULT_REVISION.equals(revision)) {
+                    logger.info("Find instance without valid service metadata: " + instance.getAddress());
+                    continue;
+                }
                 List<ServiceInstance> subInstances = revisionToInstances.computeIfAbsent(revision, r -> new LinkedList<>());
                 subInstances.add(instance);
 
@@ -104,7 +111,6 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
                     }
                 }
 
-                Map<String, Set<String>> localServiceToRevisions = new HashMap<>();
                 if (metadata != null) {
                     parseMetadata(revision, metadata, localServiceToRevisions);
                     ((DefaultServiceInstance) instance).setServiceMetadata(metadata);
@@ -115,7 +121,6 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
 //                    set.add(revision);
 //                }
 
-                Map<Set<String>, List<URL>> revisionsToUrls = new HashMap();
                 localServiceToRevisions.forEach((serviceKey, revisions) -> {
                     List<URL> urls = revisionsToUrls.get(revisions);
                     if (urls != null) {
@@ -140,8 +145,7 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
     private Map<String, Set<String>> parseMetadata(String revision, MetadataInfo metadata, Map<String, Set<String>> localServiceToRevisions) {
         Map<String, ServiceInfo> serviceInfos = metadata.getServices();
         for (Map.Entry<String, ServiceInfo> entry : serviceInfos.entrySet()) {
-            String serviceKey = entry.getValue().getServiceKey();
-            Set<String> set = localServiceToRevisions.computeIfAbsent(serviceKey, k -> new TreeSet<>());
+            Set<String> set = localServiceToRevisions.computeIfAbsent(entry.getKey(), k -> new TreeSet<>());
             set.add(revision);
         }
 
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java
index 75903ee..57010cf 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java
@@ -18,9 +18,14 @@ package org.apache.dubbo.registry.client.metadata;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.metadata.MetadataInfo;
 import org.apache.dubbo.metadata.MetadataService;
 import org.apache.dubbo.metadata.WritableMetadataService;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
 import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.metadata.store.RemoteMetadataServiceImpl;
+import org.apache.dubbo.registry.support.AbstractRegistryFactory;
+import org.apache.dubbo.rpc.model.ApplicationModel;
 
 import com.alibaba.fastjson.JSON;
 
@@ -39,6 +44,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
 import static org.apache.dubbo.common.utils.StringUtils.isBlank;
 import static org.apache.dubbo.registry.integration.RegistryProtocol.DEFAULT_REGISTER_PROVIDER_KEYS;
 import static org.apache.dubbo.rpc.Constants.DEPRECATED_KEY;
+import static org.apache.dubbo.rpc.Constants.ID_KEY;
 
 /**
  * The Utilities class for the {@link ServiceInstance#getMetadata() metadata of the service instance}
@@ -87,6 +93,8 @@ public class ServiceInstanceMetadataUtils {
 
     public static String METADATA_CLUSTER_PROPERTY_NAME = "dubbo.metadata.cluster";
 
+    public static String INSTANCE_REVISION_UPDATED_KEY = "dubbo.instance.revision.updated";
+
     /**
      * Get the multiple {@link URL urls'} parameters of {@link MetadataService MetadataService's} Metadata
      *
@@ -247,6 +255,35 @@ public class ServiceInstanceMetadataUtils {
         return null;
     }
 
+    public static void calInstanceRevision(ServiceDiscovery serviceDiscovery, ServiceInstance instance) {
+        String registryCluster = serviceDiscovery.getUrl().getParameter(ID_KEY);
+        MetadataInfo metadataInfo = WritableMetadataService.getDefaultExtension().getMetadataInfos().get(registryCluster);
+        if (metadataInfo != null) {
+            String existingInstanceRevision = instance.getMetadata().get(EXPORTED_SERVICES_REVISION_PROPERTY_NAME);
+            if (!metadataInfo.getRevision().equals(existingInstanceRevision)) {
+                instance.getMetadata().put(EXPORTED_SERVICES_REVISION_PROPERTY_NAME, metadataInfo.getRevision());
+                if (existingInstanceRevision != null) {// skip the first registration.
+                    instance.getExtendParams().put(INSTANCE_REVISION_UPDATED_KEY, "true");
+                }
+            }
+        }
+    }
+
+    public static boolean isInstanceUpdated(ServiceInstance instance) {
+        return "true".equals(instance.getExtendParams().get(INSTANCE_REVISION_UPDATED_KEY));
+    }
+
+    public static void refreshMetadataAndInstance() {
+        RemoteMetadataServiceImpl remoteMetadataService = MetadataUtils.getRemoteMetadataService();
+        remoteMetadataService.publishMetadata(ApplicationModel.getName());
+
+        AbstractRegistryFactory.getServiceDiscoveries().forEach(serviceDiscovery -> {
+            calInstanceRevision(serviceDiscovery, serviceDiscovery.getLocalInstance());
+            // update service instance revision
+            serviceDiscovery.update(serviceDiscovery.getLocalInstance());
+        });
+    }
+
     /**
      * Set the default parameters via the specified {@link URL providerURL}
      *
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java
index 8605eaa..2b747de 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -74,6 +75,7 @@ public class InMemoryWritableMetadataService implements WritableMetadataService
      */
     ConcurrentNavigableMap<String, SortedSet<URL>> exportedServiceURLs = new ConcurrentSkipListMap<>();
     ConcurrentMap<String, MetadataInfo> metadataInfos;
+    final Semaphore metadataSemaphore = new Semaphore(1);
 
     // ==================================================================================== //
 
@@ -131,6 +133,7 @@ public class InMemoryWritableMetadataService implements WritableMetadataService
             });
             metadataInfo.addService(new ServiceInfo(url));
         }
+        metadataSemaphore.release();
         return addURL(exportedServiceURLs, url);
     }
 
@@ -145,6 +148,7 @@ public class InMemoryWritableMetadataService implements WritableMetadataService
                 metadataInfos.remove(key);
             }
         }
+        metadataSemaphore.release();
         return removeURL(exportedServiceURLs, url);
     }
 
@@ -202,6 +206,14 @@ public class InMemoryWritableMetadataService implements WritableMetadataService
         return null;
     }
 
+    public void blockUntilUpdated() {
+        try {
+            metadataSemaphore.acquire();
+        } catch (InterruptedException e) {
+            logger.warn("metadata refresh thread has been interrupted unexpectedly while wating for update.", e);
+        }
+    }
+
     public Map<String, MetadataInfo> getMetadataInfos() {
         return metadataInfos;
     }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
index ee41050..00aa92a 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
@@ -47,7 +47,6 @@ import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
 import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
 
 public class RemoteMetadataServiceImpl {
-
     protected final Logger logger = LoggerFactory.getLogger(getClass());
     private WritableMetadataService localMetadataService;
 
@@ -59,11 +58,11 @@ public class RemoteMetadataServiceImpl {
         return MetadataReportInstance.getMetadataReports(true);
     }
 
-    public void publishMetadata(ServiceInstance instance) {
+    public void publishMetadata(String serviceName) {
         Map<String, MetadataInfo> metadataInfos = localMetadataService.getMetadataInfos();
         metadataInfos.forEach((registryKey, metadataInfo) -> {
             if (!metadataInfo.hasReported()) {
-                SubscriberMetadataIdentifier identifier = new SubscriberMetadataIdentifier(instance.getServiceName(), metadataInfo.getRevision());
+                SubscriberMetadataIdentifier identifier = new SubscriberMetadataIdentifier(serviceName, metadataInfo.getRevision());
                 metadataInfo.getRevision();
                 metadataInfo.getExtendParams().put(REGISTRY_KEY, registryKey);
                 MetadataReport metadataReport = getMetadataReports().get(registryKey);
@@ -71,6 +70,7 @@ public class RemoteMetadataServiceImpl {
                     metadataReport = getMetadataReports().entrySet().iterator().next().getValue();
                 }
                 metadataReport.publishAppMetadata(identifier, metadataInfo);
+                metadataInfo.markReported();
             }
         });
     }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistryFactory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistryFactory.java
index 7ea5559..541c22a 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistryFactory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistryFactory.java
@@ -24,6 +24,8 @@ import org.apache.dubbo.registry.NotifyListener;
 import org.apache.dubbo.registry.Registry;
 import org.apache.dubbo.registry.RegistryFactory;
 import org.apache.dubbo.registry.RegistryService;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
+import org.apache.dubbo.registry.client.ServiceDiscoveryRegistry;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -33,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
 import static org.apache.dubbo.rpc.cluster.Constants.EXPORT_KEY;
@@ -69,6 +72,15 @@ public abstract class AbstractRegistryFactory implements RegistryFactory {
         return REGISTRIES.get(key);
     }
 
+    public static List<ServiceDiscovery> getServiceDiscoveries() {
+        return AbstractRegistryFactory.getRegistries()
+                .stream()
+                .filter(registry -> registry instanceof ServiceDiscoveryRegistry)
+                .map(registry -> (ServiceDiscoveryRegistry) registry)
+                .map(ServiceDiscoveryRegistry::getServiceDiscovery)
+                .collect(Collectors.toList());
+    }
+
     /**
      * Close all created registries
      */
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/InMemoryServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/InMemoryServiceDiscovery.java
index fbc410b..93043ba 100644
--- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/InMemoryServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/InMemoryServiceDiscovery.java
@@ -42,6 +42,8 @@ public class InMemoryServiceDiscovery implements ServiceDiscovery {
 
     private Map<String, List<ServiceInstance>> repository = new HashMap<>();
 
+    private ServiceInstance serviceInstance;
+
     @Override
     public Set<String> getServices() {
         return repository.keySet();
@@ -68,12 +70,18 @@ public class InMemoryServiceDiscovery implements ServiceDiscovery {
         return new DefaultPage<>(offset, pageSize, data, totalSize);
     }
 
+    @Override
+    public ServiceInstance getLocalInstance() {
+        return serviceInstance;
+    }
+
     public String toString() {
         return "InMemoryServiceDiscovery";
     }
 
     @Override
     public void register(ServiceInstance serviceInstance) throws RuntimeException {
+        this.serviceInstance = serviceInstance;
         String serviceName = serviceInstance.getServiceName();
         List<ServiceInstance> serviceInstances = repository.computeIfAbsent(serviceName, s -> new LinkedList<>());
         if (!serviceInstances.contains(serviceInstance)) {
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
index 29e6790..a4cdd6c 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
@@ -41,6 +41,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.dubbo.common.function.ThrowableFunction.execute;
+import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.isInstanceUpdated;
 import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.ROOT_PATH;
 import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.build;
 import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.buildCuratorFramework;
@@ -64,6 +65,8 @@ public class ZookeeperServiceDiscovery implements ServiceDiscovery {
 
     private org.apache.curator.x.discovery.ServiceDiscovery<ZookeeperInstance> serviceDiscovery;
 
+    private ServiceInstance serviceInstance;
+
     /**
      * The Key is watched Zookeeper path, the value is an instance of {@link CuratorWatcher}
      */
@@ -87,16 +90,25 @@ public class ZookeeperServiceDiscovery implements ServiceDiscovery {
         serviceDiscovery.close();
     }
 
+    @Override
+    public ServiceInstance getLocalInstance() {
+        return serviceInstance;
+    }
+
     public void register(ServiceInstance serviceInstance) throws RuntimeException {
+        this.serviceInstance = serviceInstance;
         doInServiceRegistry(serviceDiscovery -> {
             serviceDiscovery.registerService(build(serviceInstance));
         });
     }
 
     public void update(ServiceInstance serviceInstance) throws RuntimeException {
-        doInServiceRegistry(serviceDiscovery -> {
-            serviceDiscovery.updateService(build(serviceInstance));
-        });
+        this.serviceInstance = serviceInstance;
+        if (isInstanceUpdated(serviceInstance)) {
+            doInServiceRegistry(serviceDiscovery -> {
+                serviceDiscovery.updateService(build(serviceInstance));
+            });
+        }
     }
 
     public void unregister(ServiceInstance serviceInstance) throws RuntimeException {


[dubbo] 04/04: set metadata proxy timeout

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit 3124dd8ac5812a16c4774a5fbe03f985ea826f7e
Author: ken.lj <ke...@gmail.com>
AuthorDate: Fri Jul 17 17:12:09 2020 +0800

    set metadata proxy timeout
---
 .../rpc/cluster/directory/MockDirInvocation.java   |   5 +
 .../router/condition/ConditionRouterTest.java      |   2 +-
 .../support/AbstractClusterInvokerTest.java        |   6 +-
 .../org/apache/dubbo/common/ConfigurationURL.java  |  12 +-
 .../src/main/java/org/apache/dubbo/common/URL.java | 290 +++++++++++++++++++--
 .../org/apache/dubbo/config/cache/CacheTest.java   |   2 +-
 .../apache/dubbo/metadata/MetadataConstants.java   |   2 +
 .../org/apache/dubbo/metadata/MetadataInfo.java    |  18 ++
 .../dubbo/monitor/support/MonitorFilterTest.java   |   8 +-
 .../registry/client/DefaultServiceInstance.java    |   8 +
 .../dubbo/registry/client/InstanceAddressURL.java  | 187 ++++++++++---
 .../client/ServiceDiscoveryRegistryDirectory.java  |  72 ++---
 .../dubbo/registry/client/ServiceInstance.java     |   2 +
 .../listener/ServiceInstancesChangedListener.java  |  14 +-
 .../registry/client/metadata/MetadataUtils.java    |   3 +-
 .../StandardMetadataServiceURLBuilder.java         |   7 +-
 .../main/java/org/apache/dubbo/rpc/Invocation.java |   2 +
 .../main/java/org/apache/dubbo/rpc/RpcContext.java |  66 ++---
 .../java/org/apache/dubbo/rpc/RpcInvocation.java   |  32 ++-
 .../org/apache/dubbo/rpc/filter/GenericFilter.java |   2 +-
 .../apache/dubbo/rpc/protocol/AbstractInvoker.java |   2 +-
 .../dubbo/rpc/proxy/AbstractProxyInvoker.java      |   2 +-
 .../dubbo/rpc/proxy/InvokerInvocationHandler.java  |   9 +-
 .../dubbo/rpc/filter/ExceptionFilterTest.java      |   8 +-
 .../apache/dubbo/rpc/filter/GenericFilterTest.java |   8 +-
 .../dubbo/rpc/filter/GenericImplFilterTest.java    |   8 +-
 .../apache/dubbo/rpc/proxy/AbstractProxyTest.java  |   4 +-
 .../apache/dubbo/rpc/support/MockInvocation.java   |   5 +
 .../org/apache/dubbo/rpc/support/RpcUtilsTest.java |  48 ++--
 .../rpc/protocol/dubbo/CallbackServiceCodec.java   |  10 +-
 .../dubbo/rpc/protocol/dubbo/DubboProtocol.java    |   7 +-
 .../dubbo/ReferenceCountExchangeClient.java        |   9 +-
 32 files changed, 638 insertions(+), 222 deletions(-)

diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/MockDirInvocation.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/MockDirInvocation.java
index 7a4ffb9..bc237b9 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/MockDirInvocation.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/MockDirInvocation.java
@@ -52,6 +52,11 @@ public class MockDirInvocation implements Invocation {
         return null;
     }
 
+    @Override
+    public String getProtocolServiceKey() {
+        return null;
+    }
+
     public String getMethodName() {
         return "echo";
     }
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/condition/ConditionRouterTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/condition/ConditionRouterTest.java
index 00fdf53..204ce9a 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/condition/ConditionRouterTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/condition/ConditionRouterTest.java
@@ -137,7 +137,7 @@ public class ConditionRouterTest {
 
     @Test
     public void testRoute_methodRoute() {
-        Invocation invocation = new RpcInvocation("getFoo", "com.foo.BarService", new Class<?>[0], new Object[0]);
+        Invocation invocation = new RpcInvocation("getFoo", "com.foo.BarService", "", new Class<?>[0], new Object[0]);
         // More than one methods, mismatch
         Router router = new ConditionRouterFactory().getRouter(getRouteUrl("methods=getFoo => host = 1.2.3.4"));
         boolean matchWhen = ((ConditionRouter) router).matchWhen(
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java
index a27a70e..3f94910 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java
@@ -494,21 +494,21 @@ public class AbstractClusterInvokerTest {
         Directory<DemoService> directory = new StaticDirectory<DemoService>(invokers);
         FailoverClusterInvoker<DemoService> failoverClusterInvoker = new FailoverClusterInvoker<DemoService>(directory);
         try {
-            failoverClusterInvoker.invoke(new RpcInvocation("sayHello", DemoService.class.getName(), new Class<?>[0], new Object[0]));
+            failoverClusterInvoker.invoke(new RpcInvocation("sayHello", DemoService.class.getName(), "", new Class<?>[0], new Object[0]));
             Assertions.fail();
         } catch (RpcException e) {
             Assertions.assertEquals(RpcException.TIMEOUT_EXCEPTION, e.getCode());
         }
         ForkingClusterInvoker<DemoService> forkingClusterInvoker = new ForkingClusterInvoker<DemoService>(directory);
         try {
-            forkingClusterInvoker.invoke(new RpcInvocation("sayHello", DemoService.class.getName(), new Class<?>[0], new Object[0]));
+            forkingClusterInvoker.invoke(new RpcInvocation("sayHello", DemoService.class.getName(), "", new Class<?>[0], new Object[0]));
             Assertions.fail();
         } catch (RpcException e) {
             Assertions.assertEquals(RpcException.TIMEOUT_EXCEPTION, e.getCode());
         }
         FailfastClusterInvoker<DemoService> failfastClusterInvoker = new FailfastClusterInvoker<DemoService>(directory);
         try {
-            failfastClusterInvoker.invoke(new RpcInvocation("sayHello", DemoService.class.getName(), new Class<?>[0], new Object[0]));
+            failfastClusterInvoker.invoke(new RpcInvocation("sayHello", DemoService.class.getName(), "", new Class<?>[0], new Object[0]));
             Assertions.fail();
         } catch (RpcException e) {
             Assertions.assertEquals(RpcException.TIMEOUT_EXCEPTION, e.getCode());
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/ConfigurationURL.java
similarity index 56%
copy from dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java
copy to dubbo-common/src/main/java/org/apache/dubbo/common/ConfigurationURL.java
index 7ba0a43..2042277 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/ConfigurationURL.java
@@ -14,15 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.metadata;
+package org.apache.dubbo.common;
 
-public class MetadataConstants {
-    public static final String KEY_SEPARATOR = ":";
-    public static final String DEFAULT_PATH_TAG = "metadata";
-    public static final String KEY_REVISON_PREFIX = "revision";
-    public static final String META_DATA_STORE_TAG = ".metaData";
-    public static final String SERVICE_META_DATA_STORE_TAG = ".smd";
-    public static final String CONSUMER_META_DATA_STORE_TAG = ".cmd";
-    public static final String METADATA_PUBLISH_DELAY_KEY = "dubbo.application.metadata.delay";
-    public static final int DEFAULT_METADATA_PUBLISH_DELAY = 5000;
+public class ConfigurationURL extends URL {
 }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
index 9742ebb..211d6f8 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
@@ -602,7 +602,7 @@ class URL implements Serializable {
         return Arrays.asList(strArray);
     }
 
-    private Map<String, Number> getNumbers() {
+    protected Map<String, Number> getNumbers() {
         // concurrent initialization is tolerant
         if (numbers == null) {
             numbers = new ConcurrentHashMap<>();
@@ -610,7 +610,7 @@ class URL implements Serializable {
         return numbers;
     }
 
-    private Map<String, Map<String, Number>> getMethodNumbers() {
+    protected Map<String, Map<String, Number>> getMethodNumbers() {
         if (methodNumbers == null) { // concurrent initialization is tolerant
             methodNumbers = new ConcurrentHashMap<>();
         }
@@ -795,7 +795,7 @@ class URL implements Serializable {
     }
 
     public String getMethodParameterStrict(String method, String key) {
-        Map<String, String> keyMap = methodParameters.get(method);
+        Map<String, String> keyMap = getMethodParameters().get(method);
         String value = null;
         if (keyMap != null) {
             value = keyMap.get(key);
@@ -804,7 +804,7 @@ class URL implements Serializable {
     }
 
     public String getMethodParameter(String method, String key) {
-        Map<String, String> keyMap = methodParameters.get(method);
+        Map<String, String> keyMap = getMethodParameters().get(method);
         String value = null;
         if (keyMap != null) {
             value = keyMap.get(key);
@@ -1653,16 +1653,276 @@ class URL implements Serializable {
         subParameter.put(key, value);
     }
 
-//    public String getServiceParameter(String service, String key) {
-//        return getParameter(key);
-//    }
-//
-//    public String getServiceMethodParameter(String service, String key) {
-//        return getParameter(key);
-//    }
-//
-//    public String getServiceParameter(String service, String key) {
-//        return getParameter(key);
-//    }
+    /* add service scope operations, see InstanceAddressURL */
+    public Map<String, String> getServiceParameters(String service) {
+        return getParameters();
+    }
+
+    public String getServiceParameter(String service, String key) {
+        return getParameter(key);
+    }
+
+    public String getServiceParameter(String service, String key, String defaultValue) {
+        String value = getServiceParameter(service, key);
+        return StringUtils.isEmpty(value) ? defaultValue : value;
+    }
+
+    public int getServiceParameter(String service, String key, int defaultValue) {
+        return getParameter(key, defaultValue);
+    }
+
+    public double getServiceParameter(String service, String key, double defaultValue) {
+        Number n = getServiceNumbers(service).get(key);
+        if (n != null) {
+            return n.doubleValue();
+        }
+        String value = getServiceParameter(service, key);
+        if (StringUtils.isEmpty(value)) {
+            return defaultValue;
+        }
+        double d = Double.parseDouble(value);
+        getNumbers().put(key, d);
+        return d;
+    }
+
+    public float getServiceParameter(String service, String key, float defaultValue) {
+        Number n = getNumbers().get(key);
+        if (n != null) {
+            return n.floatValue();
+        }
+        String value = getServiceParameter(service, key);
+        if (StringUtils.isEmpty(value)) {
+            return defaultValue;
+        }
+        float f = Float.parseFloat(value);
+        getNumbers().put(key, f);
+        return f;
+    }
+
+    public long getServiceParameter(String service, String key, long defaultValue) {
+        Number n = getNumbers().get(key);
+        if (n != null) {
+            return n.longValue();
+        }
+        String value = getServiceParameter(service, key);
+        if (StringUtils.isEmpty(value)) {
+            return defaultValue;
+        }
+        long l = Long.parseLong(value);
+        getNumbers().put(key, l);
+        return l;
+    }
+
+    public short getServiceParameter(String service, String key, short defaultValue) {
+        Number n = getNumbers().get(key);
+        if (n != null) {
+            return n.shortValue();
+        }
+        String value = getServiceParameter(service, key);
+        if (StringUtils.isEmpty(value)) {
+            return defaultValue;
+        }
+        short s = Short.parseShort(value);
+        getNumbers().put(key, s);
+        return s;
+    }
+
+    public byte getServiceParameter(String service, String key, byte defaultValue) {
+        Number n = getNumbers().get(key);
+        if (n != null) {
+            return n.byteValue();
+        }
+        String value = getServiceParameter(service, key);
+        if (StringUtils.isEmpty(value)) {
+            return defaultValue;
+        }
+        byte b = Byte.parseByte(value);
+        getNumbers().put(key, b);
+        return b;
+    }
+
+    public char getServiceParameter(String service, String key, char defaultValue) {
+        String value = getServiceParameter(service, key);
+        return StringUtils.isEmpty(value) ? defaultValue : value.charAt(0);
+    }
+
+    public boolean getServiceParameter(String service, String key, boolean defaultValue) {
+        String value = getServiceParameter(service, key);
+        return StringUtils.isEmpty(value) ? defaultValue : Boolean.parseBoolean(value);
+    }
+
+    public boolean hasServiceParameter(String service, String key) {
+        String value = getServiceParameter(service, key);
+        return value != null && value.length() > 0;
+    }
+
+    public float getPositiveServiceParameter(String service, String key, float defaultValue) {
+        if (defaultValue <= 0) {
+            throw new IllegalArgumentException("defaultValue <= 0");
+        }
+        float value = getServiceParameter(service, key, defaultValue);
+        return value <= 0 ? defaultValue : value;
+    }
+
+    public double getPositiveServiceParameter(String service, String key, double defaultValue) {
+        if (defaultValue <= 0) {
+            throw new IllegalArgumentException("defaultValue <= 0");
+        }
+        double value = getServiceParameter(service, key, defaultValue);
+        return value <= 0 ? defaultValue : value;
+    }
+
+    public long getPositiveServiceParameter(String service, String key, long defaultValue) {
+        if (defaultValue <= 0) {
+            throw new IllegalArgumentException("defaultValue <= 0");
+        }
+        long value = getServiceParameter(service, key, defaultValue);
+        return value <= 0 ? defaultValue : value;
+    }
+
+    public int getPositiveServiceParameter(String service, String key, int defaultValue) {
+        if (defaultValue <= 0) {
+            throw new IllegalArgumentException("defaultValue <= 0");
+        }
+        int value = getServiceParameter(service, key, defaultValue);
+        return value <= 0 ? defaultValue : value;
+    }
+
+    public short getPositiveServiceParameter(String service, String key, short defaultValue) {
+        if (defaultValue <= 0) {
+            throw new IllegalArgumentException("defaultValue <= 0");
+        }
+        short value = getServiceParameter(service, key, defaultValue);
+        return value <= 0 ? defaultValue : value;
+    }
+
+    public byte getPositiveServiceParameter(String service, String key, byte defaultValue) {
+        if (defaultValue <= 0) {
+            throw new IllegalArgumentException("defaultValue <= 0");
+        }
+        byte value = getServiceParameter(service, key, defaultValue);
+        return value <= 0 ? defaultValue : value;
+    }
+
+    public String getServiceMethodParameterAndDecoded(String service, String method, String key) {
+        return URL.decode(getServiceMethodParameter(service, method, key));
+    }
+
+    public String getServiceMethodParameterAndDecoded(String service, String method, String key, String defaultValue) {
+        return URL.decode(getServiceMethodParameter(service, method, key, defaultValue));
+    }
+
+    public String getServiceMethodParameterStrict(String service, String method, String key) {
+        return getMethodParameterStrict(method, key);
+    }
+
+    public String getServiceMethodParameter(String service, String method, String key) {
+        return getMethodParameter(method, key);
+    }
+
+    public String getServiceMethodParameter(String service, String method, String key, String defaultValue) {
+        String value = getServiceMethodParameter(service, method, key);
+        return StringUtils.isEmpty(value) ? defaultValue : value;
+    }
+
+    public double getServiceMethodParameter(String service, String method, String key, double defaultValue) {
+        Number n = getCachedNumber(method, key);
+        if (n != null) {
+            return n.doubleValue();
+        }
+        String value = getServiceMethodParameter(service, method, key);
+        if (StringUtils.isEmpty(value)) {
+            return defaultValue;
+        }
+        double d = Double.parseDouble(value);
+        updateCachedNumber(method, key, d);
+        return d;
+    }
+
+    public float getServiceMethodParameter(String service, String method, String key, float defaultValue) {
+        Number n = getCachedNumber(method, key);
+        if (n != null) {
+            return n.floatValue();
+        }
+        String value = getServiceMethodParameter(service, method, key);
+        if (StringUtils.isEmpty(value)) {
+            return defaultValue;
+        }
+        float f = Float.parseFloat(value);
+        updateCachedNumber(method, key, f);
+        return f;
+    }
+
+    public long getServiceMethodParameter(String service, String method, String key, long defaultValue) {
+        Number n = getCachedNumber(method, key);
+        if (n != null) {
+            return n.longValue();
+        }
+        String value = getServiceMethodParameter(service, method, key);
+        if (StringUtils.isEmpty(value)) {
+            return defaultValue;
+        }
+        long l = Long.parseLong(value);
+        updateCachedNumber(method, key, l);
+        return l;
+    }
+
+    public int getServiceMethodParameter(String service, String method, String key, int defaultValue) {
+        Number n = getCachedNumber(method, key);
+        if (n != null) {
+            return n.intValue();
+        }
+        String value = getServiceMethodParameter(service, method, key);
+        if (StringUtils.isEmpty(value)) {
+            return defaultValue;
+        }
+        int i = Integer.parseInt(value);
+        updateCachedNumber(method, key, i);
+        return i;
+    }
+
+    public short getMethodParameter(String service, String method, String key, short defaultValue) {
+        Number n = getCachedNumber(method, key);
+        if (n != null) {
+            return n.shortValue();
+        }
+        String value = getServiceMethodParameter(service, method, key);
+        if (StringUtils.isEmpty(value)) {
+            return defaultValue;
+        }
+        short s = Short.parseShort(value);
+        updateCachedNumber(method, key, s);
+        return s;
+    }
+
+    public byte getServiceMethodParameter(String service, String method, String key, byte defaultValue) {
+        Number n = getCachedNumber(method, key);
+        if (n != null) {
+            return n.byteValue();
+        }
+        String value = getServiceMethodParameter(service, method, key);
+        if (StringUtils.isEmpty(value)) {
+            return defaultValue;
+        }
+        byte b = Byte.parseByte(value);
+        updateCachedNumber(method, key, b);
+        return b;
+    }
+
+    public boolean hasServiceMethodParameter(String service, String method, String key) {
+        return hasMethodParameter(method, key);
+    }
+
+    public boolean hasServiceMethodParameter(String service, String method) {
+        return hasMethodParameter(method);
+    }
+
+    protected Map<String, Number> getServiceNumbers(String service) {
+        return getNumbers();
+    }
+
+    protected Map<String, Map<String, Number>> getServiceMethodNumbers(String service) {
+        return getMethodNumbers();
+    }
 
 }
diff --git a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/cache/CacheTest.java b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/cache/CacheTest.java
index 5cb18c0..ed7cb5e 100644
--- a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/cache/CacheTest.java
+++ b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/cache/CacheTest.java
@@ -131,7 +131,7 @@ public class CacheTest {
         parameters.put("findCache.cache", "threadlocal");
         URL url = new URL("dubbo", "127.0.0.1", 29582, "org.apache.dubbo.config.cache.CacheService", parameters);
 
-        Invocation invocation = new RpcInvocation("findCache", CacheService.class.getName(), new Class[]{String.class}, new String[]{"0"}, null, null, null);
+        Invocation invocation = new RpcInvocation("findCache", CacheService.class.getName(), "", new Class[]{String.class}, new String[]{"0"}, null, null, null);
 
         Cache cache = cacheFactory.getCache(url, invocation);
         assertTrue(cache instanceof ThreadLocalCache);
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java
index 7ba0a43..6bf38c9 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java
@@ -25,4 +25,6 @@ public class MetadataConstants {
     public static final String CONSUMER_META_DATA_STORE_TAG = ".cmd";
     public static final String METADATA_PUBLISH_DELAY_KEY = "dubbo.application.metadata.delay";
     public static final int DEFAULT_METADATA_PUBLISH_DELAY = 5000;
+    public static final String METADATA_PROXY_TIMEOUT_KEY = "dubbo.application.metadata.delay";
+    public static final int DEFAULT_METADATA_TIMEOUT_VALUE = 5000;
 }
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index 931a331..e3f6881 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.dubbo.common.constants.CommonConstants.DOT_SEPARATOR;
@@ -176,6 +177,8 @@ public class MetadataInfo implements Serializable {
         private transient Map<String, String> consumerParams;
         private transient Map<String, Map<String, String>> methodParams;
         private transient Map<String, Map<String, String>> consumerMethodParams;
+        private volatile transient Map<String, Number> numbers;
+        private volatile transient Map<String, Map<String, Number>> methodNumbers;
         private transient String serviceKey;
         private transient String matchKey;
 
@@ -382,6 +385,21 @@ public class MetadataInfo implements Serializable {
             }
         }
 
+        public Map<String, Number> getNumbers() {
+            // concurrent initialization is tolerant
+            if (numbers == null) {
+                numbers = new ConcurrentHashMap<>();
+            }
+            return numbers;
+        }
+
+        public Map<String, Map<String, Number>> getMethodNumbers() {
+            if (methodNumbers == null) { // concurrent initialization is tolerant
+                methodNumbers = new ConcurrentHashMap<>();
+            }
+            return methodNumbers;
+        }
+
         @Override
         public boolean equals(Object obj) {
             if (obj == null) {
diff --git a/dubbo-monitor/dubbo-monitor-api/src/test/java/org/apache/dubbo/monitor/support/MonitorFilterTest.java b/dubbo-monitor/dubbo-monitor-api/src/test/java/org/apache/dubbo/monitor/support/MonitorFilterTest.java
index 2180b27..dce7e51 100644
--- a/dubbo-monitor/dubbo-monitor-api/src/test/java/org/apache/dubbo/monitor/support/MonitorFilterTest.java
+++ b/dubbo-monitor/dubbo-monitor-api/src/test/java/org/apache/dubbo/monitor/support/MonitorFilterTest.java
@@ -119,7 +119,7 @@ public class MonitorFilterTest {
     public void testFilter() throws Exception {
         MonitorFilter monitorFilter = new MonitorFilter();
         monitorFilter.setMonitorFactory(monitorFactory);
-        Invocation invocation = new RpcInvocation("aaa", MonitorService.class.getName(), new Class<?>[0], new Object[0]);
+        Invocation invocation = new RpcInvocation("aaa", MonitorService.class.getName(), "", new Class<?>[0], new Object[0]);
         RpcContext.getContext().setRemoteAddress(NetUtils.getLocalHost(), 20880).setLocalAddress(NetUtils.getLocalHost(), 2345);
         Result result = monitorFilter.invoke(serviceInvoker, invocation);
         result.whenCompleteWithContext((r, t) -> {
@@ -149,7 +149,7 @@ public class MonitorFilterTest {
         MonitorFilter monitorFilter = new MonitorFilter();
         MonitorFactory mockMonitorFactory = mock(MonitorFactory.class);
         monitorFilter.setMonitorFactory(mockMonitorFactory);
-        Invocation invocation = new RpcInvocation("aaa", MonitorService.class.getName(), new Class<?>[0], new Object[0]);
+        Invocation invocation = new RpcInvocation("aaa", MonitorService.class.getName(), "", new Class<?>[0], new Object[0]);
         Invoker invoker = mock(Invoker.class);
         given(invoker.getUrl()).willReturn(URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880?" + APPLICATION_KEY + "=abc&" + SIDE_KEY + "=" + CONSUMER_SIDE));
 
@@ -162,7 +162,7 @@ public class MonitorFilterTest {
     public void testGenericFilter() throws Exception {
         MonitorFilter monitorFilter = new MonitorFilter();
         monitorFilter.setMonitorFactory(monitorFactory);
-        Invocation invocation = new RpcInvocation("$invoke", MonitorService.class.getName(), new Class<?>[]{String.class, String[].class, Object[].class}, new Object[]{"xxx", new String[]{}, new Object[]{}});
+        Invocation invocation = new RpcInvocation("$invoke", MonitorService.class.getName(), "", new Class<?>[]{String.class, String[].class, Object[].class}, new Object[]{"xxx", new String[]{}, new Object[]{}});
         RpcContext.getContext().setRemoteAddress(NetUtils.getLocalHost(), 20880).setLocalAddress(NetUtils.getLocalHost(), 2345);
         Result result = monitorFilter.invoke(serviceInvoker, invocation);
         result.whenCompleteWithContext((r, t) -> {
@@ -196,7 +196,7 @@ public class MonitorFilterTest {
 
         monitorFilter.setMonitorFactory(mockMonitorFactory);
         given(mockMonitorFactory.getMonitor(any(URL.class))).willReturn(mockMonitor);
-        Invocation invocation = new RpcInvocation("aaa", MonitorService.class.getName(), new Class<?>[0], new Object[0]);
+        Invocation invocation = new RpcInvocation("aaa", MonitorService.class.getName(), "", new Class<?>[0], new Object[0]);
 
         monitorFilter.invoke(serviceInvoker, invocation);
     }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
index 3ee5dd2..3c7204d 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
@@ -146,6 +146,14 @@ public class DefaultServiceInstance implements ServiceInstance {
         return extendParams;
     }
 
+    @Override
+    public Map<String, String> getAllParams() {
+        Map<String, String> allParams = new HashMap<>((int) ((metadata.size() + extendParams.size()) / 0.75f + 1));
+        allParams.putAll(metadata);
+        allParams.putAll(extendParams);
+        return allParams;
+    }
+
     public void setMetadata(Map<String, String> metadata) {
         this.metadata = metadata;
     }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
index 149c6e2..ca8f9be 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
@@ -23,30 +23,31 @@ import org.apache.dubbo.rpc.RpcContext;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
 
-/**
- * FIXME, replace RpcContext operations with explicitly defined APIs
- */
 public class InstanceAddressURL extends URL {
     private ServiceInstance instance;
     private MetadataInfo metadataInfo;
+    private volatile transient Map<String, Number> numbers;
+    private volatile transient Map<String, Map<String, Number>> methodNumbers;
+
+    public InstanceAddressURL() {
+    }
 
     public InstanceAddressURL(
             ServiceInstance instance,
             MetadataInfo metadataInfo
     ) {
-//        super()
         this.instance = instance;
         this.metadataInfo = metadataInfo;
         this.host = instance.getHost();
         this.port = instance.getPort();
     }
 
-
     public ServiceInstance getInstance() {
         return instance;
     }
@@ -74,6 +75,11 @@ public class InstanceAddressURL extends URL {
     }
 
     @Override
+    public String getProtocolServiceKey() {
+        return RpcContext.getContext().getProtocolServiceKey();
+    }
+
+    @Override
     public String getServiceKey() {
         return RpcContext.getContext().getServiceKey();
     }
@@ -99,33 +105,32 @@ public class InstanceAddressURL extends URL {
             return getServiceInterface();
         }
 
-        String value = getInstanceMetadata().get(key);
-        if (StringUtils.isEmpty(value) && metadataInfo != null) {
-            value = metadataInfo.getParameter(key, RpcContext.getContext().getProtocolServiceKey());
+        String protocolServiceKey = getProtocolServiceKey();
+        if (protocolServiceKey == null) {
+            return getInstanceParameter(key);
         }
-        return value;
+        return getServiceParameter(protocolServiceKey, key);
     }
 
     @Override
-    public String getParameter(String key, String defaultValue) {
-        if (VERSION_KEY.equals(key)) {
-            return getVersion();
-        } else if (GROUP_KEY.equals(key)) {
-            return getGroup();
-        } else if (INTERFACE_KEY.equals(key)) {
-            return getServiceInterface();
-        }
-
-        String value = getParameter(key);
-        if (StringUtils.isEmpty(value)) {
-            return defaultValue;
+    public String getServiceParameter(String service, String key) {
+        String value = getInstanceParameter(key);
+        if (StringUtils.isEmpty(value) && metadataInfo != null) {
+            value = metadataInfo.getParameter(key, service);
         }
         return value;
     }
 
+    /**
+     * method parameter only exists in ServiceInfo
+     *
+     * @param method
+     * @param key
+     * @return
+     */
     @Override
-    public String getMethodParameter(String method, String key) {
-        MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey());
+    public String getServiceMethodParameter(String protocolServiceKey, String method, String key) {
+        MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(protocolServiceKey);
         String value = serviceInfo.getMethodParameter(method, key, null);
         if (StringUtils.isNotEmpty(value)) {
             return value;
@@ -134,8 +139,24 @@ public class InstanceAddressURL extends URL {
     }
 
     @Override
-    public boolean hasMethodParameter(String method, String key) {
-        MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey());
+    public String getMethodParameter(String method, String key) {
+        String protocolServiceKey = getProtocolServiceKey();
+        if (protocolServiceKey == null) {
+            return null;
+        }
+        return getServiceMethodParameter(protocolServiceKey, method, key);
+    }
+
+    /**
+     * method parameter only exists in ServiceInfo
+     *
+     * @param method
+     * @param key
+     * @return
+     */
+    @Override
+    public boolean hasServiceMethodParameter(String protocolServiceKey, String method, String key) {
+        MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(protocolServiceKey);
 
         if (method == null) {
             String suffix = "." + key;
@@ -160,20 +181,44 @@ public class InstanceAddressURL extends URL {
     }
 
     @Override
-    public boolean hasMethodParameter(String method) {
-        MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey());
+    public boolean hasMethodParameter(String method, String key) {
+        String protocolServiceKey = getProtocolServiceKey();
+        if (protocolServiceKey == null) {
+            return false;
+        }
+        return hasServiceMethodParameter(protocolServiceKey, method, key);
+    }
+
+    /**
+     * method parameter only exists in ServiceInfo
+     *
+     * @param method
+     * @return
+     */
+    @Override
+    public boolean hasServiceMethodParameter(String protocolServiceKey, String method) {
+        MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(protocolServiceKey);
         return serviceInfo.hasMethodParameter(method);
     }
 
+    @Override
+    public boolean hasMethodParameter(String method) {
+        String protocolServiceKey = getProtocolServiceKey();
+        if (protocolServiceKey == null) {
+            return false;
+        }
+        return hasServiceMethodParameter(protocolServiceKey, method);
+    }
+
     /**
      * Avoid calling this method in RPC call.
      *
      * @return
      */
     @Override
-    public Map<String, String> getParameters() {
+    public Map<String, String> getServiceParameters(String protocolServiceKey) {
         Map<String, String> instanceParams = getInstanceMetadata();
-        Map<String, String> metadataParams = (metadataInfo == null ? new HashMap<>() : metadataInfo.getParameters(RpcContext.getContext().getProtocolServiceKey()));
+        Map<String, String> metadataParams = (metadataInfo == null ? new HashMap<>() : metadataInfo.getParameters(protocolServiceKey));
         int i = instanceParams == null ? 0 : instanceParams.size();
         int j = metadataParams == null ? 0 : metadataParams.size();
         Map<String, String> params = new HashMap<>((int) ((i + j) / 0.75) + 1);
@@ -186,8 +231,13 @@ public class InstanceAddressURL extends URL {
         return params;
     }
 
-    private Map<String, String> getInstanceMetadata() {
-        return this.instance.getMetadata();
+    @Override
+    public Map<String, String> getParameters() {
+        String protocolServiceKey = getProtocolServiceKey();
+        if (protocolServiceKey == null) {
+            return getInstance().getAllParams();
+        }
+        return getServiceParameters(protocolServiceKey);
     }
 
     @Override
@@ -196,8 +246,7 @@ public class InstanceAddressURL extends URL {
             return this;
         }
 
-        String protocolServiceKey = RpcContext.getContext().getProtocolServiceKey();
-        getMetadataInfo().getServiceInfo(protocolServiceKey).addParameter(key, value);
+        getInstance().getExtendParams().put(key, value);
         return this;
     }
 
@@ -207,18 +256,84 @@ public class InstanceAddressURL extends URL {
             return this;
         }
 
-        String protocolServiceKey = RpcContext.getContext().getProtocolServiceKey();
+        getInstance().getExtendParams().putIfAbsent(key, value);
+        return this;
+    }
+
+    public URL addServiceParameter(String protocolServiceKey, String key, String value) {
+        if (StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) {
+            return this;
+        }
+
+        getMetadataInfo().getServiceInfo(protocolServiceKey).addParameter(key, value);
+        return this;
+    }
+
+    public URL addServiceParameterIfAbsent(String protocolServiceKey, String key, String value) {
+        if (StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) {
+            return this;
+        }
+
         getMetadataInfo().getServiceInfo(protocolServiceKey).addParameterIfAbsent(key, value);
         return this;
     }
 
-    public URL addConsumerParams(Map<String, String> params) {
-        String protocolServiceKey = RpcContext.getContext().getProtocolServiceKey();
+    public URL addConsumerParams(String protocolServiceKey, Map<String, String> params) {
         getMetadataInfo().getServiceInfo(protocolServiceKey).addConsumerParams(params);
         return this;
     }
 
     @Override
+    protected Map<String, Number> getServiceNumbers(String protocolServiceKey) {
+        return getServiceInfo(protocolServiceKey).getNumbers();
+    }
+
+    @Override
+    protected Map<String, Number> getNumbers() {
+        String protocolServiceKey = getProtocolServiceKey();
+        if (protocolServiceKey == null) {
+            if (numbers == null) { // concurrent initialization is tolerant
+                numbers = new ConcurrentHashMap<>();
+            }
+            return numbers;
+        }
+        return getServiceNumbers(protocolServiceKey);
+    }
+
+    @Override
+    protected Map<String, Map<String, Number>> getServiceMethodNumbers(String protocolServiceKey) {
+        return getServiceInfo(protocolServiceKey).getMethodNumbers();
+    }
+
+    @Override
+    protected Map<String, Map<String, Number>> getMethodNumbers() {
+        String protocolServiceKey = getProtocolServiceKey();
+        if (protocolServiceKey == null) {
+            if (methodNumbers == null) { // concurrent initialization is tolerant
+                methodNumbers = new ConcurrentHashMap<>();
+            }
+            return methodNumbers;
+        }
+        return getServiceMethodNumbers(protocolServiceKey);
+    }
+
+    private MetadataInfo.ServiceInfo getServiceInfo(String protocolServiceKey) {
+        return metadataInfo.getServiceInfo(protocolServiceKey);
+    }
+
+    private String getInstanceParameter(String key) {
+        String value = this.instance.getMetadata().get(key);
+        if (StringUtils.isNotEmpty(value)) {
+            return value;
+        }
+        return this.instance.getExtendParams().get(key);
+    }
+
+    private Map<String, String> getInstanceMetadata() {
+        return this.instance.getMetadata();
+    }
+
+    @Override
     public boolean equals(Object obj) {
         // instance metadata equals
         if (obj == null) {
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
index e01a36e..9c725ac 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
@@ -57,49 +57,53 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
     public synchronized void notify(List<URL> instanceUrls) {
         // Set the context of the address notification thread.
         RpcContext.setRpcContext(getConsumerUrl());
-        if (CollectionUtils.isEmpty(instanceUrls)) {
-            // FIXME, empty protocol
-        }
         refreshInvoker(instanceUrls);
     }
 
     private void refreshInvoker(List<URL> invokerUrls) {
-        Assert.notNull(invokerUrls, "invokerUrls should not be null, use empty:// to clear address.");
+        Assert.notNull(invokerUrls, "invokerUrls should not be null, use empty InstanceAddressURL to clear address.");
 
-        if (invokerUrls.size() == 1
-                && invokerUrls.get(0) != null
-                && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
-            this.forbidden = true; // Forbid to access
-            this.invokers = Collections.emptyList();
-            routerChain.setInvokers(this.invokers);
-            destroyAllInvokers(); // Close all invokers
-        } else {
-            this.forbidden = false; // Allow to access
-            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
-            if (CollectionUtils.isEmpty(invokerUrls)) {
-                return;
+        if (invokerUrls.size() == 1) {
+            URL url = invokerUrls.get(0);
+            if (!(url instanceof InstanceAddressURL)) {
+                throw new IllegalStateException("use empty InstanceAddressURL to clear address");
+            } else {
+                InstanceAddressURL instanceAddressURL = (InstanceAddressURL) url;
+                if (instanceAddressURL.getInstance() == null) {
+                    this.forbidden = true; // Forbid to access
+                    this.invokers = Collections.emptyList();
+                    routerChain.setInvokers(this.invokers);
+                    destroyAllInvokers(); // Close all invokers
+                    return;
+                }
             }
+        }
 
-            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
+        this.forbidden = false; // Allow to access
+        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
+        if (CollectionUtils.isEmpty(invokerUrls)) {
+            return;
+        }
 
-            if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
-                logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size() + ")"));
-                return;
-            }
+        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
 
-            List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
-            // pre-route and build cache, notice that route cache should build on original Invoker list.
-            // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
-            routerChain.setInvokers(newInvokers);
-            this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
-            this.urlInvokerMap = newUrlInvokerMap;
+        if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
+            logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size() + ")"));
+            return;
+        }
 
-            if (oldUrlInvokerMap != null) {
-                try {
-                    destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
-                } catch (Exception e) {
-                    logger.warn("destroyUnusedInvokers error. ", e);
-                }
+        List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
+        // pre-route and build cache, notice that route cache should build on original Invoker list.
+        // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
+        routerChain.setInvokers(newInvokers);
+        this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
+        this.urlInvokerMap = newUrlInvokerMap;
+
+        if (oldUrlInvokerMap != null) {
+            try {
+                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
+            } catch (Exception e) {
+                logger.warn("destroyUnusedInvokers error. ", e);
             }
         }
     }
@@ -129,7 +133,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
             }
 
             // FIXME, some keys may need to be removed.
-            instanceAddressURL.addConsumerParams(queryMap);
+            instanceAddressURL.addConsumerParams(getConsumerUrl().getProtocolServiceKey(), queryMap);
 
             Invoker<T> invoker = urlInvokerMap == null ? null : urlInvokerMap.get(instanceAddressURL.getAddress());
             if (invoker == null || urlChanged(invoker, instanceAddressURL)) { // Not in the cache, refer again
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java
index 9362811..7b1890d 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java
@@ -86,6 +86,8 @@ public interface ServiceInstance extends Serializable {
 
     Map<String, String> getExtendParams();
 
+    Map<String, String> getAllParams();
+
     /**
      * @return the hash code of current instance.
      */
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 8af48a3..37b6bf0 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.registry.client.event.listener;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.event.ConditionalEventListener;
 import org.apache.dubbo.event.EventListener;
 import org.apache.dubbo.metadata.MetadataInfo;
@@ -26,6 +27,7 @@ import org.apache.dubbo.metadata.MetadataInfo.ServiceInfo;
 import org.apache.dubbo.metadata.MetadataService;
 import org.apache.dubbo.registry.NotifyListener;
 import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.InstanceAddressURL;
 import org.apache.dubbo.registry.client.RegistryClusterIdentifier;
 import org.apache.dubbo.registry.client.ServiceDiscovery;
 import org.apache.dubbo.registry.client.ServiceInstance;
@@ -161,12 +163,13 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
                 RemoteMetadataServiceImpl remoteMetadataService = MetadataUtils.getRemoteMetadataService();
                 metadataInfo = remoteMetadataService.getMetadata(instance);
             } else {
-                MetadataService metadataServiceProxy = MetadataUtils.getMetadataServiceProxy(instance);
+                MetadataService metadataServiceProxy = MetadataUtils.getMetadataServiceProxy(instance, serviceDiscovery);
                 metadataInfo = metadataServiceProxy.getMetadataInfo(ServiceInstanceMetadataUtils.getExportedServicesRevision(instance));
             }
         } catch (Exception e) {
-            // TODO, load metadata backup
+            logger.error("Failed to load service metadata, metadta type is " + metadataType, e);
             metadataInfo = null;
+            // TODO, load metadata backup. Stop getting metadata after x times of failure for one revision?
         }
         return metadataInfo;
     }
@@ -174,7 +177,12 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
     private void notifyAddressChanged() {
         listeners.forEach((key, notifyListener) -> {
             //FIXME, group wildcard match
-            notifyListener.notify(serviceUrls.get(key));
+            List<URL> urls = serviceUrls.get(key);
+            if (CollectionUtils.isEmpty(urls)) {
+                urls = new ArrayList<>();
+                urls.add(new InstanceAddressURL());
+            }
+            notifyListener.notify(urls);
         });
     }
 
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
index 2685a4a..b65dba1 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.metadata.MetadataService;
 import org.apache.dubbo.metadata.WritableMetadataService;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
 import org.apache.dubbo.registry.client.ServiceInstance;
 import org.apache.dubbo.registry.client.metadata.store.RemoteMetadataServiceImpl;
 import org.apache.dubbo.rpc.Invoker;
@@ -67,7 +68,7 @@ public class MetadataUtils {
         getRemoteMetadataService().publishServiceDefinition(url);
     }
 
-    public static MetadataService getMetadataServiceProxy(ServiceInstance instance) {
+    public static MetadataService getMetadataServiceProxy(ServiceInstance instance, ServiceDiscovery serviceDiscovery) {
         String key = instance.getServiceName() + "##" +
                 ServiceInstanceMetadataUtils.getExportedServicesRevision(instance);
         return metadataServiceProxies.computeIfAbsent(key, k -> {
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/StandardMetadataServiceURLBuilder.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/StandardMetadataServiceURLBuilder.java
index 026146e..6f6a501 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/StandardMetadataServiceURLBuilder.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/StandardMetadataServiceURLBuilder.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.registry.client.metadata;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.URLBuilder;
+import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.metadata.MetadataService;
 import org.apache.dubbo.registry.client.ServiceInstance;
 
@@ -28,6 +29,9 @@ import java.util.Map;
 import static java.lang.String.valueOf;
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.PORT_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static org.apache.dubbo.metadata.MetadataConstants.DEFAULT_METADATA_TIMEOUT_VALUE;
+import static org.apache.dubbo.metadata.MetadataConstants.METADATA_PROXY_TIMEOUT_KEY;
 import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getMetadataServiceURLsParams;
 
 /**
@@ -64,7 +68,8 @@ public class StandardMetadataServiceURLBuilder implements MetadataServiceURLBuil
                     .setHost(host)
                     .setPort(port)
                     .setProtocol(protocol)
-                    .setPath(MetadataService.class.getName());
+                    .setPath(MetadataService.class.getName())
+                    .addParameter(TIMEOUT_KEY, ConfigurationUtils.get(METADATA_PROXY_TIMEOUT_KEY, DEFAULT_METADATA_TIMEOUT_VALUE));
 
             // add parameters
             params.forEach((name, value) -> urlBuilder.addParameter(name, valueOf(value)));
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Invocation.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Invocation.java
index ac02fee..8422baf 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Invocation.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Invocation.java
@@ -32,6 +32,8 @@ public interface Invocation {
 
     String getTargetServiceUniqueName();
 
+    String getProtocolServiceKey();
+
     /**
      * get method name.
      *
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
index 2c3d2b8..8077067 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
@@ -800,60 +800,48 @@ public class RpcContext {
     }
 
     // RPC service context updated before each service call.
-    private String group;
-    private String version;
-    private String interfaceName;
-    private String protocol;
-    private String serviceKey;
-    private String protocolServiceKey;
     private URL consumerUrl;
 
     public String getGroup() {
-        return group;
-    }
-
-    public void setGroup(String group) {
-        this.group = group;
+        if (consumerUrl == null) {
+            return null;
+        }
+        return consumerUrl.getParameter(GROUP_KEY);
     }
 
     public String getVersion() {
-        return version;
-    }
-
-    public void setVersion(String version) {
-        this.version = version;
+        if (consumerUrl == null) {
+            return null;
+        }
+        return consumerUrl.getParameter(VERSION_KEY);
     }
 
     public String getInterfaceName() {
-        return interfaceName;
-    }
-
-    public void setInterfaceName(String interfaceName) {
-        this.interfaceName = interfaceName;
+        if (consumerUrl == null) {
+            return null;
+        }
+        return consumerUrl.getServiceInterface();
     }
 
     public String getProtocol() {
-        return protocol;
-    }
-
-    public void setProtocol(String protocol) {
-        this.protocol = protocol;
+        if (consumerUrl == null) {
+            return null;
+        }
+        return consumerUrl.getParameter(PROTOCOL_KEY, DUBBO);
     }
 
     public String getServiceKey() {
-        return serviceKey;
-    }
-
-    public void setServiceKey(String serviceKey) {
-        this.serviceKey = serviceKey;
+        if (consumerUrl == null) {
+            return null;
+        }
+        return consumerUrl.getServiceKey();
     }
 
     public String getProtocolServiceKey() {
-        return protocolServiceKey;
-    }
-
-    public void setProtocolServiceKey(String protocolServiceKey) {
-        this.protocolServiceKey = protocolServiceKey;
+        if (consumerUrl == null) {
+            return null;
+        }
+        return consumerUrl.getProtocolServiceKey();
     }
 
     public URL getConsumerUrl() {
@@ -867,11 +855,5 @@ public class RpcContext {
     public static void setRpcContext(URL url) {
         RpcContext rpcContext = RpcContext.getContext();
         rpcContext.setConsumerUrl(url);
-        rpcContext.setInterfaceName(url.getServiceInterface());
-        rpcContext.setVersion(url.getParameter(VERSION_KEY));
-        rpcContext.setGroup(url.getParameter(GROUP_KEY));
-        rpcContext.setProtocol(url.getParameter(PROTOCOL_KEY, DUBBO));
-        rpcContext.setServiceKey(url.getServiceKey());
-        rpcContext.setProtocolServiceKey(url.getProtocolServiceKey());
     }
 }
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
index 87baa42..4a78cd6 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
@@ -51,6 +51,7 @@ public class RpcInvocation implements Invocation, Serializable {
     private static final long serialVersionUID = -4355285085441097045L;
 
     private String targetServiceUniqueName;
+    private String protocolServiceKey;
 
     private String methodName;
     private String serviceName;
@@ -83,8 +84,8 @@ public class RpcInvocation implements Invocation, Serializable {
     }
 
     public RpcInvocation(Invocation invocation, Invoker<?> invoker) {
-        this(invocation.getMethodName(), invocation.getServiceName(), invocation.getParameterTypes(),
-                invocation.getArguments(), new HashMap<>(invocation.getObjectAttachments()),
+        this(invocation.getMethodName(), invocation.getServiceName(), invocation.getProtocolServiceKey(),
+                invocation.getParameterTypes(), invocation.getArguments(), new HashMap<>(invocation.getObjectAttachments()),
                 invocation.getInvoker(), invocation.getAttributes());
         if (invoker != null) {
             URL url = invoker.getUrl();
@@ -109,35 +110,37 @@ public class RpcInvocation implements Invocation, Serializable {
             }
         }
         this.targetServiceUniqueName = invocation.getTargetServiceUniqueName();
+        this.protocolServiceKey = invocation.getProtocolServiceKey();
     }
 
     public RpcInvocation(Invocation invocation) {
-        this(invocation.getMethodName(), invocation.getServiceName(), invocation.getParameterTypes(),
+        this(invocation.getMethodName(), invocation.getServiceName(), invocation.getProtocolServiceKey(), invocation.getParameterTypes(),
                 invocation.getArguments(), invocation.getObjectAttachments(), invocation.getInvoker(), invocation.getAttributes());
         this.targetServiceUniqueName = invocation.getTargetServiceUniqueName();
     }
 
-    public RpcInvocation(Method method, String serviceName, Object[] arguments) {
-        this(method, serviceName, arguments, null, null);
+    public RpcInvocation(Method method, String serviceName, String protocolServiceKey, Object[] arguments) {
+        this(method, serviceName, protocolServiceKey, arguments, null, null);
     }
 
-    public RpcInvocation(Method method, String serviceName, Object[] arguments, Map<String, Object> attachment, Map<Object, Object> attributes) {
-        this(method.getName(), serviceName, method.getParameterTypes(), arguments, attachment, null, attributes);
+    public RpcInvocation(Method method, String serviceName, String protocolServiceKey, Object[] arguments, Map<String, Object> attachment, Map<Object, Object> attributes) {
+        this(method.getName(), serviceName, protocolServiceKey, method.getParameterTypes(), arguments, attachment, null, attributes);
         this.returnType = method.getReturnType();
     }
 
-    public RpcInvocation(String methodName, String serviceName, Class<?>[] parameterTypes, Object[] arguments) {
-        this(methodName, serviceName, parameterTypes, arguments, null, null, null);
+    public RpcInvocation(String methodName, String serviceName, String protocolServiceKey, Class<?>[] parameterTypes, Object[] arguments) {
+        this(methodName, serviceName, protocolServiceKey, parameterTypes, arguments, null, null, null);
     }
 
-    public RpcInvocation(String methodName, String serviceName, Class<?>[] parameterTypes, Object[] arguments, Map<String, Object> attachments) {
-        this(methodName, serviceName, parameterTypes, arguments, attachments, null, null);
+    public RpcInvocation(String methodName, String serviceName, String protocolServiceKey, Class<?>[] parameterTypes, Object[] arguments, Map<String, Object> attachments) {
+        this(methodName, serviceName, protocolServiceKey, parameterTypes, arguments, attachments, null, null);
     }
 
-    public RpcInvocation(String methodName, String serviceName, Class<?>[] parameterTypes, Object[] arguments,
+    public RpcInvocation(String methodName, String serviceName, String protocolServiceKey, Class<?>[] parameterTypes, Object[] arguments,
                          Map<String, Object> attachments, Invoker<?> invoker, Map<Object, Object> attributes) {
         this.methodName = methodName;
         this.serviceName = serviceName;
+        this.protocolServiceKey = protocolServiceKey;
         this.parameterTypes = parameterTypes == null ? new Class<?>[0] : parameterTypes;
         this.arguments = arguments == null ? new Object[0] : arguments;
         this.attachments = attachments == null ? new HashMap<>() : attachments;
@@ -194,6 +197,11 @@ public class RpcInvocation implements Invocation, Serializable {
         return targetServiceUniqueName;
     }
 
+    @Override
+    public String getProtocolServiceKey() {
+        return protocolServiceKey;
+    }
+
     public void setTargetServiceUniqueName(String targetServiceUniqueName) {
         this.targetServiceUniqueName = targetServiceUniqueName;
     }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/GenericFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/GenericFilter.java
index 13c29b6..0234139 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/GenericFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/GenericFilter.java
@@ -140,7 +140,7 @@ public class GenericFilter implements Filter, Filter.Listener {
                     }
                 }
 
-                RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args, inv.getObjectAttachments(), inv.getAttributes());
+                RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), invoker.getUrl().getProtocolServiceKey(), args, inv.getObjectAttachments(), inv.getAttributes());
                 rpcInvocation.setInvoker(inv.getInvoker());
                 rpcInvocation.setTargetServiceUniqueName(inv.getTargetServiceUniqueName());
 
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index 90e101d..0865f08 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -45,7 +45,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * AbstractInvoker.
+ * This Invoker works on Consumer side.
  */
 public abstract class AbstractInvoker<T> implements Invoker<T> {
 
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
index 6d74825..1b1d592 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
@@ -33,7 +33,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 
 /**
- * InvokerWrapper
+ * This Invoker works on provider side, delegates RPC to interface implementation.
  */
 public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
     Logger logger = LoggerFactory.getLogger(AbstractProxyInvoker.class);
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
index 0eae664..69f1d06 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.dubbo.rpc.proxy;
 
+import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.rpc.Constants;
@@ -35,10 +36,14 @@ public class InvokerInvocationHandler implements InvocationHandler {
     private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
     private final Invoker<?> invoker;
     private ConsumerModel consumerModel;
+    private URL url;
+    private String protocolServiceKey;
 
     public InvokerInvocationHandler(Invoker<?> handler) {
         this.invoker = handler;
-        String serviceKey = invoker.getUrl().getServiceKey();
+        this.url = invoker.getUrl();
+        String serviceKey = this.url.getServiceKey();
+        this.protocolServiceKey = this.url.getProtocolServiceKey();
         if (serviceKey != null) {
             this.consumerModel = ApplicationModel.getConsumerModel(serviceKey);
         }
@@ -63,7 +68,7 @@ public class InvokerInvocationHandler implements InvocationHandler {
         } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
             return invoker.equals(args[0]);
         }
-        RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
+        RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args);
         String serviceKey = invoker.getUrl().getServiceKey();
         rpcInvocation.setTargetServiceUniqueName(serviceKey);
 
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java
index 53f8921..2f32ff6 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java
@@ -51,7 +51,7 @@ public class ExceptionFilterTest {
         RpcException exception = new RpcException("TestRpcException");
 
         ExceptionFilter exceptionFilter = new ExceptionFilter();
-        RpcInvocation invocation = new RpcInvocation("sayHello", DemoService.class.getName(), new Class<?>[]{String.class}, new Object[]{"world"});
+        RpcInvocation invocation = new RpcInvocation("sayHello", DemoService.class.getName(), "", new Class<?>[]{String.class}, new Object[]{"world"});
         Invoker<DemoService> invoker = mock(Invoker.class);
         given(invoker.getInterface()).willReturn(DemoService.class);
         given(invoker.invoke(eq(invocation))).willThrow(exception);
@@ -75,7 +75,7 @@ public class ExceptionFilterTest {
     public void testJavaException() {
 
         ExceptionFilter exceptionFilter = new ExceptionFilter();
-        RpcInvocation invocation = new RpcInvocation("sayHello", DemoService.class.getName(), new Class<?>[]{String.class}, new Object[]{"world"});
+        RpcInvocation invocation = new RpcInvocation("sayHello", DemoService.class.getName(), "", new Class<?>[]{String.class}, new Object[]{"world"});
 
         AppResponse appResponse = new AppResponse();
         appResponse.setException(new IllegalArgumentException("java"));
@@ -95,7 +95,7 @@ public class ExceptionFilterTest {
     public void testRuntimeException() {
 
         ExceptionFilter exceptionFilter = new ExceptionFilter();
-        RpcInvocation invocation = new RpcInvocation("sayHello", DemoService.class.getName(), new Class<?>[]{String.class}, new Object[]{"world"});
+        RpcInvocation invocation = new RpcInvocation("sayHello", DemoService.class.getName(), "", new Class<?>[]{String.class}, new Object[]{"world"});
 
         AppResponse appResponse = new AppResponse();
         appResponse.setException(new LocalException("localException"));
@@ -115,7 +115,7 @@ public class ExceptionFilterTest {
     public void testConvertToRunTimeException() throws Exception {
 
         ExceptionFilter exceptionFilter = new ExceptionFilter();
-        RpcInvocation invocation = new RpcInvocation("sayHello", DemoService.class.getName(), new Class<?>[]{String.class}, new Object[]{"world"});
+        RpcInvocation invocation = new RpcInvocation("sayHello", DemoService.class.getName(), "", new Class<?>[]{String.class}, new Object[]{"world"});
 
         AppResponse mockRpcResult = new AppResponse();
         mockRpcResult.setException(new HessianException("hessian"));
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericFilterTest.java
index e06d2b7..c49aa92 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericFilterTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericFilterTest.java
@@ -54,7 +54,7 @@ public class GenericFilterTest {
         person.put("name", "dubbo");
         person.put("age", 10);
 
-        RpcInvocation invocation = new RpcInvocation($INVOKE, GenericService.class.getName(), genericInvoke.getParameterTypes(),
+        RpcInvocation invocation = new RpcInvocation($INVOKE, GenericService.class.getName(), "", genericInvoke.getParameterTypes(),
                 new Object[]{"getPerson", new String[]{Person.class.getCanonicalName()}, new Object[]{person}});
 
         URL url = URL.valueOf("test://test:11/org.apache.dubbo.rpc.support.DemoService?" +
@@ -82,7 +82,7 @@ public class GenericFilterTest {
             person.put("name", "dubbo");
             person.put("age", 10);
 
-            RpcInvocation invocation = new RpcInvocation($INVOKE, GenericService.class.getName(), genericInvoke.getParameterTypes(),
+            RpcInvocation invocation = new RpcInvocation($INVOKE, GenericService.class.getName(), "", genericInvoke.getParameterTypes(),
                     new Object[]{"getPerson", new String[]{Person.class.getCanonicalName()}, new Object[]{person}});
             invocation.setAttachment(GENERIC_KEY, GENERIC_SERIALIZATION_NATIVE_JAVA);
 
@@ -106,7 +106,7 @@ public class GenericFilterTest {
         person.put("name", "dubbo");
         person.put("age", 10);
 
-        RpcInvocation invocation = new RpcInvocation("sayHi", GenericService.class.getName(), genericInvoke.getParameterTypes()
+        RpcInvocation invocation = new RpcInvocation("sayHi", GenericService.class.getName(), "", genericInvoke.getParameterTypes()
                 , new Object[]{"getPerson", new String[]{Person.class.getCanonicalName()}, new Object[]{person}});
 
         URL url = URL.valueOf("test://test:11/org.apache.dubbo.rpc.support.DemoService?" +
@@ -130,7 +130,7 @@ public class GenericFilterTest {
         person.put("name", "dubbo");
         person.put("age", 10);
 
-        RpcInvocation invocation = new RpcInvocation($INVOKE, GenericService.class.getName(), genericInvoke.getParameterTypes()
+        RpcInvocation invocation = new RpcInvocation($INVOKE, GenericService.class.getName(), "", genericInvoke.getParameterTypes()
                 , new Object[]{"getPerson", new String[]{Person.class.getCanonicalName()}});
 
         URL url = URL.valueOf("test://test:11/org.apache.dubbo.rpc.support.DemoService?" +
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericImplFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericImplFilterTest.java
index a1908fb..b7828aa 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericImplFilterTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericImplFilterTest.java
@@ -49,7 +49,7 @@ public class GenericImplFilterTest {
     public void testInvoke() throws Exception {
 
         RpcInvocation invocation = new RpcInvocation("getPerson", "org.apache.dubbo.rpc.support.DemoService",
-                new Class[]{Person.class}, new Object[]{new Person("dubbo", 10)});
+                "org.apache.dubbo.rpc.support.DemoService:dubbo", new Class[]{Person.class}, new Object[]{new Person("dubbo", 10)});
 
 
         URL url = URL.valueOf("test://test:11/org.apache.dubbo.rpc.support.DemoService?" +
@@ -77,7 +77,7 @@ public class GenericImplFilterTest {
     public void testInvokeWithException() throws Exception {
 
         RpcInvocation invocation = new RpcInvocation("getPerson", "org.apache.dubbo.rpc.support.DemoService",
-                new Class[]{Person.class}, new Object[]{new Person("dubbo", 10)});
+                "org.apache.dubbo.rpc.support.DemoService:dubbo", new Class[]{Person.class}, new Object[]{new Person("dubbo", 10)});
 
         URL url = URL.valueOf("test://test:11/org.apache.dubbo.rpc.support.DemoService?" +
                 "accesslog=true&group=dubbo&version=1.1&generic=true");
@@ -104,8 +104,8 @@ public class GenericImplFilterTest {
         person.put("name", "dubbo");
         person.put("age", 10);
 
-        RpcInvocation invocation = new RpcInvocation($INVOKE, GenericService.class.getName(), genericInvoke.getParameterTypes(),
-                new Object[]{"getPerson", new String[]{Person.class.getCanonicalName()}, new Object[]{person}});
+        RpcInvocation invocation = new RpcInvocation($INVOKE, GenericService.class.getName(), "org.apache.dubbo.rpc.support.DemoService:dubbo",
+                genericInvoke.getParameterTypes(), new Object[]{"getPerson", new String[]{Person.class.getCanonicalName()}, new Object[]{person}});
 
         URL url = URL.valueOf("test://test:11/org.apache.dubbo.rpc.support.DemoService?" +
                 "accesslog=true&group=dubbo&version=1.1&generic=true");
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/proxy/AbstractProxyTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/proxy/AbstractProxyTest.java
index 1abaf0e..b505ee8 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/proxy/AbstractProxyTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/proxy/AbstractProxyTest.java
@@ -51,7 +51,7 @@ public abstract class AbstractProxyTest {
         //Assertions.assertEquals(proxy.toString(), invoker.toString());
         //Assertions.assertEquals(proxy.hashCode(), invoker.hashCode());
 
-        Assertions.assertEquals(invoker.invoke(new RpcInvocation("echo", DemoService.class.getName(), new Class[]{String.class}, new Object[]{"aa"})).getValue()
+        Assertions.assertEquals(invoker.invoke(new RpcInvocation("echo", DemoService.class.getName(), DemoService.class.getName() + ":dubbo", new Class[]{String.class}, new Object[]{"aa"})).getValue()
                 , proxy.echo("aa"));
     }
 
@@ -65,7 +65,7 @@ public abstract class AbstractProxyTest {
 
         Assertions.assertEquals(invoker.getInterface(), DemoService.class);
 
-        Assertions.assertEquals(invoker.invoke(new RpcInvocation("echo", DemoService.class.getName(), new Class[]{String.class}, new Object[]{"aa"})).getValue(),
+        Assertions.assertEquals(invoker.invoke(new RpcInvocation("echo", DemoService.class.getName(), DemoService.class.getName() + ":dubbo", new Class[]{String.class}, new Object[]{"aa"})).getValue(),
                 origin.echo("aa"));
 
     }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java
index bdf77b3..586c990 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java
@@ -52,6 +52,11 @@ public class MockInvocation implements Invocation {
         return null;
     }
 
+    @Override
+    public String getProtocolServiceKey() {
+        return null;
+    }
+
     public String getMethodName() {
         return "echo";
     }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/RpcUtilsTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/RpcUtilsTest.java
index f70d8b6..00deb91 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/RpcUtilsTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/RpcUtilsTest.java
@@ -51,7 +51,7 @@ public class RpcUtilsTest {
         URL url = URL.valueOf("dubbo://localhost/?test.async=true");
         Map<String, Object> attachments = new HashMap<>();
         attachments.put("aa", "bb");
-        Invocation inv = new RpcInvocation("test", "DemoService", new Class[]{}, new String[]{}, attachments);
+        Invocation inv = new RpcInvocation("test", "DemoService", "", new Class[]{}, new String[]{}, attachments);
         RpcUtils.attachInvocationIdIfAsync(url, inv);
         long id1 = RpcUtils.getInvocationId(inv);
         RpcUtils.attachInvocationIdIfAsync(url, inv);
@@ -68,7 +68,7 @@ public class RpcUtilsTest {
     @Test
     public void testAttachInvocationIdIfAsync_sync() {
         URL url = URL.valueOf("dubbo://localhost/");
-        Invocation inv = new RpcInvocation("test", "DemoService", new Class[]{}, new String[]{});
+        Invocation inv = new RpcInvocation("test", "DemoService", "", new Class[]{}, new String[]{});
         RpcUtils.attachInvocationIdIfAsync(url, inv);
         assertNull(RpcUtils.getInvocationId(inv));
     }
@@ -80,7 +80,7 @@ public class RpcUtilsTest {
     @Test
     public void testAttachInvocationIdIfAsync_nullAttachments() {
         URL url = URL.valueOf("dubbo://localhost/?test.async=true");
-        Invocation inv = new RpcInvocation("test", "DemoService", new Class[]{}, new String[]{});
+        Invocation inv = new RpcInvocation("test", "DemoService", "", new Class[]{}, new String[]{});
         RpcUtils.attachInvocationIdIfAsync(url, inv);
         assertTrue(RpcUtils.getInvocationId(inv) >= 0L);
     }
@@ -92,7 +92,7 @@ public class RpcUtilsTest {
     @Test
     public void testAttachInvocationIdIfAsync_forceNotAttache() {
         URL url = URL.valueOf("dubbo://localhost/?test.async=true&" + AUTO_ATTACH_INVOCATIONID_KEY + "=false");
-        Invocation inv = new RpcInvocation("test", "DemoService", new Class[]{}, new String[]{});
+        Invocation inv = new RpcInvocation("test", "DemoService", "", new Class[]{}, new String[]{});
         RpcUtils.attachInvocationIdIfAsync(url, inv);
         assertNull(RpcUtils.getInvocationId(inv));
     }
@@ -104,7 +104,7 @@ public class RpcUtilsTest {
     @Test
     public void testAttachInvocationIdIfAsync_forceAttache() {
         URL url = URL.valueOf("dubbo://localhost/?" + AUTO_ATTACH_INVOCATIONID_KEY + "=true");
-        Invocation inv = new RpcInvocation("test", "DemoService", new Class[]{}, new String[]{});
+        Invocation inv = new RpcInvocation("test", "DemoService", "", new Class[]{}, new String[]{});
         RpcUtils.attachInvocationIdIfAsync(url, inv);
         assertNotNull(RpcUtils.getInvocationId(inv));
     }
@@ -117,30 +117,30 @@ public class RpcUtilsTest {
         given(invoker.getUrl()).willReturn(URL.valueOf("test://127.0.0.1:1/org.apache.dubbo.rpc.support.DemoService?interface=org.apache.dubbo.rpc.support.DemoService"));
 
         // void sayHello(String name);
-        RpcInvocation inv = new RpcInvocation("sayHello", serviceName, new Class<?>[]{String.class}, null, null, invoker, null);
+        RpcInvocation inv = new RpcInvocation("sayHello", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null);
         Class<?> returnType = RpcUtils.getReturnType(inv);
         Assertions.assertNull(returnType);
 
         //String echo(String text);
-        RpcInvocation inv1 = new RpcInvocation("echo", serviceName, new Class<?>[]{String.class}, null, null, invoker, null);
+        RpcInvocation inv1 = new RpcInvocation("echo", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null);
         Class<?> returnType1 = RpcUtils.getReturnType(inv1);
         Assertions.assertNotNull(returnType1);
         Assertions.assertEquals(String.class, returnType1);
 
         //int getSize(String[] strs);
-        RpcInvocation inv2 = new RpcInvocation("getSize", serviceName, new Class<?>[]{String[].class}, null, null, invoker, null);
+        RpcInvocation inv2 = new RpcInvocation("getSize", serviceName, "", new Class<?>[]{String[].class}, null, null, invoker, null);
         Class<?> returnType2 = RpcUtils.getReturnType(inv2);
         Assertions.assertNotNull(returnType2);
         Assertions.assertEquals(int.class, returnType2);
 
         //Person getPerson(Person person);
-        RpcInvocation inv3 = new RpcInvocation("getPerson", serviceName, new Class<?>[]{Person.class}, null, null, invoker, null);
+        RpcInvocation inv3 = new RpcInvocation("getPerson", serviceName, "", new Class<?>[]{Person.class}, null, null, invoker, null);
         Class<?> returnType3 = RpcUtils.getReturnType(inv3);
         Assertions.assertNotNull(returnType3);
         Assertions.assertEquals(Person.class, returnType3);
 
         //List<String> testReturnType1(String str);
-        RpcInvocation inv4 = new RpcInvocation("testReturnType1", serviceName, new Class<?>[]{String.class}, null, null, invoker, null);
+        RpcInvocation inv4 = new RpcInvocation("testReturnType1", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null);
         Class<?> returnType4 = RpcUtils.getReturnType(inv4);
         Assertions.assertNotNull(returnType4);
         Assertions.assertEquals(List.class, returnType4);
@@ -154,7 +154,7 @@ public class RpcUtilsTest {
         Invoker invoker = mock(Invoker.class);
         given(invoker.getUrl()).willReturn(URL.valueOf("test://127.0.0.1:1/org.apache.dubbo.rpc.support.DemoService?interface=org.apache.dubbo.rpc.support.DemoService"));
 
-        RpcInvocation inv = new RpcInvocation("testReturnType", serviceName, new Class<?>[]{String.class}, null, null, invoker, null);
+        RpcInvocation inv = new RpcInvocation("testReturnType", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null);
         Type[] types = RpcUtils.getReturnTypes(inv);
         Assertions.assertNotNull(types);
         Assertions.assertEquals(2, types.length);
@@ -162,7 +162,7 @@ public class RpcUtilsTest {
         Assertions.assertEquals(String.class, types[1]);
         Assertions.assertArrayEquals(types, inv.getReturnTypes());
 
-        RpcInvocation inv1 = new RpcInvocation("testReturnType1", serviceName, new Class<?>[]{String.class}, null, null, invoker, null);
+        RpcInvocation inv1 = new RpcInvocation("testReturnType1", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null);
         java.lang.reflect.Type[] types1 = RpcUtils.getReturnTypes(inv1);
         Assertions.assertNotNull(types1);
         Assertions.assertEquals(2, types1.length);
@@ -170,7 +170,7 @@ public class RpcUtilsTest {
         Assertions.assertEquals(demoServiceClass.getMethod("testReturnType1", String.class).getGenericReturnType(), types1[1]);
         Assertions.assertArrayEquals(types1, inv1.getReturnTypes());
 
-        RpcInvocation inv2 = new RpcInvocation("testReturnType2", serviceName, new Class<?>[]{String.class}, null, null, invoker, null);
+        RpcInvocation inv2 = new RpcInvocation("testReturnType2", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null);
         java.lang.reflect.Type[] types2 = RpcUtils.getReturnTypes(inv2);
         Assertions.assertNotNull(types2);
         Assertions.assertEquals(2, types2.length);
@@ -178,7 +178,7 @@ public class RpcUtilsTest {
         Assertions.assertEquals(String.class, types2[1]);
         Assertions.assertArrayEquals(types2, inv2.getReturnTypes());
 
-        RpcInvocation inv3 = new RpcInvocation("testReturnType3", serviceName, new Class<?>[]{String.class}, null, null, invoker, null);
+        RpcInvocation inv3 = new RpcInvocation("testReturnType3", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null);
         java.lang.reflect.Type[] types3 = RpcUtils.getReturnTypes(inv3);
         Assertions.assertNotNull(types3);
         Assertions.assertEquals(2, types3.length);
@@ -187,7 +187,7 @@ public class RpcUtilsTest {
         Assertions.assertEquals(((ParameterizedType) genericReturnType3).getActualTypeArguments()[0], types3[1]);
         Assertions.assertArrayEquals(types3, inv3.getReturnTypes());
 
-        RpcInvocation inv4 = new RpcInvocation("testReturnType4", serviceName, new Class<?>[]{String.class}, null, null, invoker, null);
+        RpcInvocation inv4 = new RpcInvocation("testReturnType4", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null);
         java.lang.reflect.Type[] types4 = RpcUtils.getReturnTypes(inv4);
         Assertions.assertNotNull(types4);
         Assertions.assertEquals(2, types4.length);
@@ -195,7 +195,7 @@ public class RpcUtilsTest {
         Assertions.assertNull(types4[1]);
         Assertions.assertArrayEquals(types4, inv4.getReturnTypes());
 
-        RpcInvocation inv5 = new RpcInvocation("testReturnType5", serviceName, new Class<?>[]{String.class}, null, null, invoker, null);
+        RpcInvocation inv5 = new RpcInvocation("testReturnType5", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null);
         java.lang.reflect.Type[] types5 = RpcUtils.getReturnTypes(inv5);
         Assertions.assertNotNull(types5);
         Assertions.assertEquals(2, types5.length);
@@ -212,7 +212,7 @@ public class RpcUtilsTest {
         Invoker invoker = mock(Invoker.class);
 
         // void sayHello(String name);
-        RpcInvocation inv1 = new RpcInvocation("sayHello", serviceName,
+        RpcInvocation inv1 = new RpcInvocation("sayHello", serviceName, "",
                 new Class<?>[]{String.class}, null, null, invoker, null);
         Class<?>[] parameterTypes1 = RpcUtils.getParameterTypes(inv1);
         Assertions.assertNotNull(parameterTypes1);
@@ -220,12 +220,12 @@ public class RpcUtilsTest {
         Assertions.assertEquals(String.class, parameterTypes1[0]);
 
         //long timestamp();
-        RpcInvocation inv2 = new RpcInvocation("timestamp", serviceName, null, null, null, invoker, null);
+        RpcInvocation inv2 = new RpcInvocation("timestamp", serviceName, "", null, null, null, invoker, null);
         Class<?>[] parameterTypes2 = RpcUtils.getParameterTypes(inv2);
         Assertions.assertEquals(0, parameterTypes2.length);
 
         //Type enumlength(Type... types);
-        RpcInvocation inv3 = new RpcInvocation("enumlength", serviceName,
+        RpcInvocation inv3 = new RpcInvocation("enumlength", serviceName, "",
                 new Class<?>[]{Type.class, Type.class}, null, null, invoker, null);
         Class<?>[] parameterTypes3 = RpcUtils.getParameterTypes(inv3);
         Assertions.assertNotNull(parameterTypes3);
@@ -234,7 +234,7 @@ public class RpcUtilsTest {
         Assertions.assertEquals(Type.class, parameterTypes3[1]);
 
         //byte getbyte(byte arg);
-        RpcInvocation inv4 = new RpcInvocation("getbyte", serviceName,
+        RpcInvocation inv4 = new RpcInvocation("getbyte", serviceName, "",
                 new Class<?>[]{byte.class}, null, null, invoker, null);
         Class<?>[] parameterTypes4 = RpcUtils.getParameterTypes(inv4);
         Assertions.assertNotNull(parameterTypes4);
@@ -242,7 +242,7 @@ public class RpcUtilsTest {
         Assertions.assertEquals(byte.class, parameterTypes4[0]);
 
         //void $invoke(String s1, String s2);
-        RpcInvocation inv5 = new RpcInvocation("$invoke", serviceName,
+        RpcInvocation inv5 = new RpcInvocation("$invoke", serviceName, "",
                 new Class<?>[]{String.class, String[].class},
                 new Object[]{"method", new String[]{"java.lang.String", "void", "java.lang.Object"}},
                 null, invoker, null);
@@ -265,7 +265,7 @@ public class RpcUtilsTest {
         String serviceName = demoServiceClass.getName();
         Invoker invoker = mock(Invoker.class);
 
-        RpcInvocation inv1 = new RpcInvocation(methodName, serviceName,
+        RpcInvocation inv1 = new RpcInvocation(methodName, serviceName, "",
                 new Class<?>[]{String.class}, null, null, invoker, null);
         String actual = RpcUtils.getMethodName(inv1);
         Assertions.assertNotNull(actual);
@@ -283,7 +283,7 @@ public class RpcUtilsTest {
         String serviceName = demoServiceClass.getName();
         Invoker invoker = mock(Invoker.class);
 
-        RpcInvocation inv = new RpcInvocation("$invoke", serviceName,
+        RpcInvocation inv = new RpcInvocation("$invoke", serviceName, "",
                 new Class<?>[]{String.class, String[].class},
                 new Object[]{method, new String[]{"java.lang.String", "void", "java.lang.Object"}},
                 null, invoker, null);
@@ -300,7 +300,7 @@ public class RpcUtilsTest {
         String serviceName = demoServiceClass.getName();
         Invoker invoker = mock(Invoker.class);
 
-        RpcInvocation inv = new RpcInvocation("$invoke", serviceName,
+        RpcInvocation inv = new RpcInvocation("$invoke", serviceName, "",
                 new Class<?>[]{String.class, String[].class, Object[].class},
                 new Object[]{"method", new String[]{}, args},
                 null, invoker, null);
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
index 7a60e0e..6502e8d 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
@@ -63,11 +63,11 @@ class CallbackServiceCodec {
     private static final byte CALLBACK_DESTROY = 0x2;
     private static final String INV_ATT_CALLBACK_KEY = "sys_callback_arg-";
 
-    private static byte isCallBack(URL url, String methodName, int argIndex) {
+    private static byte isCallBack(URL url, String protocolServiceKey, String methodName, int argIndex) {
         // parameter callback rule: method-name.parameter-index(starting from 0).callback
         byte isCallback = CALLBACK_NONE;
-        if (url != null && url.hasMethodParameter(methodName)) {
-            String callback = url.getParameter(methodName + "." + argIndex + ".callback");
+        if (url != null && url.hasServiceMethodParameter(protocolServiceKey, methodName)) {
+            String callback = url.getServiceParameter(protocolServiceKey, methodName + "." + argIndex + ".callback");
             if (callback != null) {
                 if ("true".equalsIgnoreCase(callback)) {
                     isCallback = CALLBACK_CREATE;
@@ -266,7 +266,7 @@ class CallbackServiceCodec {
     public static Object encodeInvocationArgument(Channel channel, RpcInvocation inv, int paraIndex) throws IOException {
         // get URL directly
         URL url = inv.getInvoker() == null ? null : inv.getInvoker().getUrl();
-        byte callbackStatus = isCallBack(url, inv.getMethodName(), paraIndex);
+        byte callbackStatus = isCallBack(url, inv.getProtocolServiceKey(), inv.getMethodName(), paraIndex);
         Object[] args = inv.getArguments();
         Class<?>[] pts = inv.getParameterTypes();
         switch (callbackStatus) {
@@ -293,7 +293,7 @@ class CallbackServiceCodec {
             }
             return inObject;
         }
-        byte callbackstatus = isCallBack(url, inv.getMethodName(), paraIndex);
+        byte callbackstatus = isCallBack(url, inv.getProtocolServiceKey(), inv.getMethodName(), paraIndex);
         switch (callbackstatus) {
             case CallbackServiceCodec.CALLBACK_CREATE:
                 try {
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index e095523..791df67 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -195,7 +195,7 @@ public class DubboProtocol extends AbstractProtocol {
                 return null;
             }
 
-            RpcInvocation invocation = new RpcInvocation(method, url.getParameter(INTERFACE_KEY), new Class<?>[0], new Object[0]);
+            RpcInvocation invocation = new RpcInvocation(method, url.getParameter(INTERFACE_KEY), "", new Class<?>[0], new Object[0]);
             invocation.setAttachment(PATH_KEY, url.getPath());
             invocation.setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY));
             invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
@@ -572,10 +572,9 @@ public class DubboProtocol extends AbstractProtocol {
         // client type setting.
         String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
 
-//        url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
+        url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
         // enable heartbeat by default
-        // FIXME,
-//        url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
+        url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
 
         // BIO is not allowed since it has severe performance issue.
         if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
index 9229363..8c4d07b 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
@@ -19,7 +19,6 @@ package org.apache.dubbo.rpc.protocol.dubbo;
 
 import org.apache.dubbo.common.Parameters;
 import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.URLBuilder;
 import org.apache.dubbo.remoting.ChannelHandler;
 import org.apache.dubbo.remoting.RemotingException;
 import org.apache.dubbo.remoting.exchange.ExchangeClient;
@@ -181,14 +180,10 @@ final class ReferenceCountExchangeClient implements ExchangeClient {
      */
     private void replaceWithLazyClient() {
         // this is a defensive operation to avoid client is closed by accident, the initial state of the client is false
-        URL lazyUrl = URLBuilder.from(url)
-                .addParameter(LAZY_CONNECT_INITIAL_STATE_KEY, Boolean.TRUE)
+        URL lazyUrl = url.addParameter(LAZY_CONNECT_INITIAL_STATE_KEY, Boolean.TRUE)
                 .addParameter(RECONNECT_KEY, Boolean.FALSE)
                 .addParameter(SEND_RECONNECT_KEY, Boolean.TRUE.toString())
-                .addParameter("warning", Boolean.TRUE.toString())
-                .addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true)
-                .addParameter("_client_memo", "referencecounthandler.replacewithlazyclient")
-                .build();
+                .addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true);
 
         /**
          * the order of judgment in the if statement cannot be changed.


[dubbo] 03/04: can basically work with InstanceAddressURL

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit 42f0529054187227dedd153b14d2b7b4605b8ff4
Author: ken.lj <ke...@gmail.com>
AuthorDate: Tue Jul 14 01:51:51 2020 +0800

    can basically work with InstanceAddressURL
---
 .../src/main/java/org/apache/dubbo/common/URL.java |  12 +-
 .../org/apache/dubbo/metadata/MetadataInfo.java    | 118 ++++++++++++++++++--
 .../registry/client/DefaultServiceInstance.java    |  27 +++--
 .../dubbo/registry/client/InstanceAddressURL.java  | 123 +++++++++++++++++----
 .../registry/client/ServiceDiscoveryRegistry.java  |   4 +-
 .../client/ServiceDiscoveryRegistryDirectory.java  |  69 +++++++-----
 .../metadata/ServiceInstanceMetadataUtils.java     |  18 +--
 .../registry/integration/DynamicDirectory.java     |   9 +-
 .../registry/integration/RegistryDirectory.java    |   2 +-
 .../apache/dubbo/remoting/exchange/Exchangers.java |   2 +-
 .../dubbo/remoting/transport/AbstractEndpoint.java |   2 +-
 .../dubbo/rpc/proxy/InvokerInvocationHandler.java  |   1 +
 .../dubbo/rpc/protocol/dubbo/DubboProtocol.java    |   5 +-
 .../dubbo/rpc/protocol/thrift/ThriftProtocol.java  |   2 +-
 14 files changed, 292 insertions(+), 102 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
index 5f811fa..9742ebb 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
@@ -93,19 +93,19 @@ class URL implements Serializable {
 
     private static final long serialVersionUID = -1985165475234910535L;
 
-    private final String protocol;
+    protected String protocol;
 
-    private final String username;
+    protected String username;
 
-    private final String password;
+    protected String password;
 
     // by default, host to registry
-    private final String host;
+    protected String host;
 
     // by default, port to registry
-    private final int port;
+    protected int port;
 
-    private final String path;
+    protected String path;
 
     private final Map<String, String> parameters;
 
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index 4db0298..931a331 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -154,7 +155,13 @@ public class MetadataInfo implements Serializable {
         if (serviceInfo == null) {
             return Collections.emptyMap();
         }
-        return serviceInfo.getParams();
+        return serviceInfo.getAllParams();
+    }
+
+    @Override
+    public String toString() {
+        // FIXME
+        return super.toString();
     }
 
     public static class ServiceInfo implements Serializable {
@@ -163,9 +170,12 @@ public class MetadataInfo implements Serializable {
         private String group;
         private String version;
         private String protocol;
+        private String path; // most of the time, path is the same with the interface name.
         private Map<String, String> params;
 
+        private transient Map<String, String> consumerParams;
         private transient Map<String, Map<String, String>> methodParams;
+        private transient Map<String, Map<String, String>> consumerMethodParams;
         private transient String serviceKey;
         private transient String matchKey;
 
@@ -173,14 +183,7 @@ public class MetadataInfo implements Serializable {
         }
 
         public ServiceInfo(URL url) {
-            // FIXME, how to match registry.
-            this(
-                    url.getServiceInterface(),
-                    url.getParameter(GROUP_KEY),
-                    url.getParameter(VERSION_KEY),
-                    url.getProtocol(),
-                    null
-            );
+            this(url.getServiceInterface(), url.getParameter(GROUP_KEY), url.getParameter(VERSION_KEY), url.getProtocol(), url.getPath(), null);
 
             Map<String, String> params = new HashMap<>();
             List<MetadataParamsFilter> filters = loader.getActivateExtension(url, "params-filter");
@@ -207,11 +210,12 @@ public class MetadataInfo implements Serializable {
             this.params = params;
         }
 
-        public ServiceInfo(String name, String group, String version, String protocol, Map<String, String> params) {
+        public ServiceInfo(String name, String group, String version, String protocol, String path, Map<String, String> params) {
             this.name = name;
             this.group = group;
             this.version = version;
             this.protocol = protocol;
+            this.path = path;
             this.params = params == null ? new HashMap<>() : params;
 
             this.serviceKey = URL.buildKey(name, group, version);
@@ -266,6 +270,14 @@ public class MetadataInfo implements Serializable {
             this.version = version;
         }
 
+        public String getPath() {
+            return path;
+        }
+
+        public void setPath(String path) {
+            this.path = path;
+        }
+
         public Map<String, String> getParams() {
             if (params == null) {
                 return Collections.emptyMap();
@@ -277,16 +289,42 @@ public class MetadataInfo implements Serializable {
             this.params = params;
         }
 
+        public Map<String, String> getAllParams() {
+            if (consumerParams != null) {
+                Map<String, String> allParams = new HashMap<>((int) ((params.size() + consumerParams.size()) / 0.75f + 1));
+                allParams.putAll(params);
+                allParams.putAll(consumerParams);
+                return allParams;
+            }
+            return params;
+        }
+
         public String getParameter(String key) {
+            if (consumerParams != null) {
+                String value = consumerParams.get(key);
+                if (value != null) {
+                    return value;
+                }
+            }
             return params.get(key);
         }
 
         public String getMethodParameter(String method, String key, String defaultValue) {
             if (methodParams == null) {
                 methodParams = URL.toMethodParameters(params);
+                consumerMethodParams = URL.toMethodParameters(consumerParams);
+            }
+
+            String value = getMethodParameter(method, key, consumerMethodParams);
+            if (value != null) {
+                return value;
             }
+            value = getMethodParameter(method, key, methodParams);
+            return value == null ? defaultValue : value;
+        }
 
-            Map<String, String> keyMap = methodParams.get(method);
+        private String getMethodParameter(String method, String key, Map<String, Map<String, String>> map) {
+            Map<String, String> keyMap = map.get(method);
             String value = null;
             if (keyMap != null) {
                 value = keyMap.get(key);
@@ -294,7 +332,21 @@ public class MetadataInfo implements Serializable {
             if (StringUtils.isEmpty(value)) {
                 value = getParameter(key);
             }
-            return value == null ? defaultValue : value;
+            return value;
+        }
+
+        public boolean hasMethodParameter(String method, String key) {
+            String value = this.getMethodParameter(method, key, (String) null);
+            return StringUtils.isNotEmpty(value);
+        }
+
+        public boolean hasMethodParameter(String method) {
+            if (methodParams == null) {
+                methodParams = URL.toMethodParameters(params);
+                consumerMethodParams = URL.toMethodParameters(consumerParams);
+            }
+
+            return consumerMethodParams.containsKey(method) || methodParams.containsKey(method);
         }
 
         public String toDescString() {
@@ -310,5 +362,47 @@ public class MetadataInfo implements Serializable {
             }
             return methodStrings.toString();
         }
+
+        public void addParameter(String key, String value) {
+            if (consumerParams != null) {
+                this.consumerParams.put(key, value);
+            }
+        }
+
+        public void addParameterIfAbsent(String key, String value) {
+            if (consumerParams != null) {
+                this.consumerParams.putIfAbsent(key, value);
+            }
+        }
+
+        public void addConsumerParams(Map<String, String> params) {
+            // copy once for one service subscription
+            if (consumerParams == null) {
+                consumerParams = new HashMap<>(params);
+            }
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == null) {
+                return false;
+            }
+            if (!(obj instanceof ServiceInfo)) {
+                return false;
+            }
+
+            ServiceInfo serviceInfo = (ServiceInfo) obj;
+            return this.getMatchKey().equals(serviceInfo.getMatchKey()) && this.getParams().equals(serviceInfo.getParams());
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(getMatchKey(), getParams());
+        }
+
+        @Override
+        public String toString() {
+            return super.toString();
+        }
     }
 }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
index e44bd4f..3ee5dd2 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
@@ -22,6 +22,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.dubbo.common.constants.CommonConstants.REVISION_KEY;
+
 /**
  * The default implementation of {@link ServiceInstance}.
  *
@@ -166,18 +168,29 @@ public class DefaultServiceInstance implements ServiceInstance {
         if (this == o) return true;
         if (!(o instanceof DefaultServiceInstance)) return false;
         DefaultServiceInstance that = (DefaultServiceInstance) o;
-        return isEnabled() == that.isEnabled() &&
-                isHealthy() == that.isHealthy() &&
-                Objects.equals(getId(), that.getId()) &&
-                Objects.equals(getServiceName(), that.getServiceName()) &&
+        boolean equals = Objects.equals(getServiceName(), that.getServiceName()) &&
                 Objects.equals(getHost(), that.getHost()) &&
-                Objects.equals(getPort(), that.getPort()) &&
-                Objects.equals(getMetadata(), that.getMetadata());
+                Objects.equals(getPort(), that.getPort());
+        for (Map.Entry<String, String> entry : this.getMetadata().entrySet()) {
+            if (entry.getKey().equals(REVISION_KEY)) {
+                continue;
+            }
+            equals = equals && !entry.getValue().equals(that.getMetadata().get(entry.getKey()));
+        }
+
+        return equals;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(getId(), getServiceName(), getHost(), getPort(), isEnabled(), isHealthy(), getMetadata());
+        int result = Objects.hash(getServiceName(), getHost(), getPort());
+        for (Map.Entry<String, String> entry : this.getMetadata().entrySet()) {
+            if (entry.getKey().equals(REVISION_KEY)) {
+                continue;
+            }
+            result = 31 * result + (entry.getValue() == null ? 0 : entry.getValue().hashCode());
+        }
+        return result;
     }
 
     @Override
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
index 18632bf..149c6e2 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
@@ -28,6 +28,9 @@ import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
 
+/**
+ * FIXME, replace RpcContext operations with explicitly defined APIs
+ */
 public class InstanceAddressURL extends URL {
     private ServiceInstance instance;
     private MetadataInfo metadataInfo;
@@ -36,10 +39,20 @@ public class InstanceAddressURL extends URL {
             ServiceInstance instance,
             MetadataInfo metadataInfo
     ) {
+//        super()
         this.instance = instance;
         this.metadataInfo = metadataInfo;
-        this.setHost(instance.getHost());
-        this.setPort(instance.getPort());
+        this.host = instance.getHost();
+        this.port = instance.getPort();
+    }
+
+
+    public ServiceInstance getInstance() {
+        return instance;
+    }
+
+    public MetadataInfo getMetadataInfo() {
+        return metadataInfo;
     }
 
     @Override
@@ -71,6 +84,12 @@ public class InstanceAddressURL extends URL {
     }
 
     @Override
+    public String getPath() {
+        MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey());
+        return serviceInfo.getPath();
+    }
+
+    @Override
     public String getParameter(String key) {
         if (VERSION_KEY.equals(key)) {
             return getVersion();
@@ -80,10 +99,7 @@ public class InstanceAddressURL extends URL {
             return getServiceInterface();
         }
 
-        String value = getConsumerParameters().get(key);
-        if (StringUtils.isEmpty(value)) {
-            value = getInstanceMetadata().get(key);
-        }
+        String value = getInstanceMetadata().get(key);
         if (StringUtils.isEmpty(value) && metadataInfo != null) {
             value = metadataInfo.getParameter(key, RpcContext.getContext().getProtocolServiceKey());
         }
@@ -109,15 +125,52 @@ public class InstanceAddressURL extends URL {
 
     @Override
     public String getMethodParameter(String method, String key) {
-        String value = getMethodParameter(method, key);
+        MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey());
+        String value = serviceInfo.getMethodParameter(method, key, null);
         if (StringUtils.isNotEmpty(value)) {
             return value;
         }
-        MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getServiceKey());
-        return serviceInfo.getMethodParameter(method, key, null);
+        return getParameter(key);
     }
 
     @Override
+    public boolean hasMethodParameter(String method, String key) {
+        MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey());
+
+        if (method == null) {
+            String suffix = "." + key;
+            for (String fullKey : getParameters().keySet()) {
+                if (fullKey.endsWith(suffix)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+        if (key == null) {
+            String prefix = method + ".";
+            for (String fullKey : getParameters().keySet()) {
+                if (fullKey.startsWith(prefix)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        return serviceInfo.hasMethodParameter(method, key);
+    }
+
+    @Override
+    public boolean hasMethodParameter(String method) {
+        MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey());
+        return serviceInfo.hasMethodParameter(method);
+    }
+
+    /**
+     * Avoid calling this method in RPC call.
+     *
+     * @return
+     */
+    @Override
     public Map<String, String> getParameters() {
         Map<String, String> instanceParams = getInstanceMetadata();
         Map<String, String> metadataParams = (metadataInfo == null ? new HashMap<>() : metadataInfo.getParameters(RpcContext.getContext().getProtocolServiceKey()));
@@ -130,8 +183,6 @@ public class InstanceAddressURL extends URL {
         if (metadataParams != null) {
             params.putAll(metadataParams);
         }
-
-        params.putAll(getConsumerParameters());
         return params;
     }
 
@@ -139,32 +190,56 @@ public class InstanceAddressURL extends URL {
         return this.instance.getMetadata();
     }
 
-    private Map<String, String> getConsumerParameters() {
-        return RpcContext.getContext().getConsumerUrl().getParameters();
-    }
+    @Override
+    public URL addParameter(String key, String value) {
+        if (StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) {
+            return this;
+        }
 
-    private String getConsumerParameter(String key) {
-        return RpcContext.getContext().getConsumerUrl().getParameter(key);
+        String protocolServiceKey = RpcContext.getContext().getProtocolServiceKey();
+        getMetadataInfo().getServiceInfo(protocolServiceKey).addParameter(key, value);
+        return this;
     }
 
-    private String getConsumerMethodParameter(String method, String key) {
-        return RpcContext.getContext().getConsumerUrl().getMethodParameter(method, key);
+    @Override
+    public URL addParameterIfAbsent(String key, String value) {
+        if (StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) {
+            return this;
+        }
+
+        String protocolServiceKey = RpcContext.getContext().getProtocolServiceKey();
+        getMetadataInfo().getServiceInfo(protocolServiceKey).addParameterIfAbsent(key, value);
+        return this;
     }
 
-    @Override
-    public URL addParameter(String key, String value) {
-        throw new UnsupportedOperationException("");
+    public URL addConsumerParams(Map<String, String> params) {
+        String protocolServiceKey = RpcContext.getContext().getProtocolServiceKey();
+        getMetadataInfo().getServiceInfo(protocolServiceKey).addConsumerParams(params);
+        return this;
     }
 
     @Override
     public boolean equals(Object obj) {
         // instance metadata equals
-        // service metadata equals
-        return super.equals(obj);
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof InstanceAddressURL)) {
+            return false;
+        }
+
+        InstanceAddressURL that = (InstanceAddressURL) obj;
+
+        return this.getInstance().equals(that.getInstance());
     }
 
     @Override
     public int hashCode() {
-        return super.hashCode();
+        return getInstance().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return super.toString();
     }
 }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
index 9b129f9..7d4d48e 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
@@ -52,6 +52,8 @@ import static java.util.Collections.emptySet;
 import static java.util.Collections.unmodifiableSet;
 import static java.util.stream.Collectors.toSet;
 import static java.util.stream.Stream.of;
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPERATOR;
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.MAPPING_KEY;
@@ -316,7 +318,7 @@ public class ServiceDiscoveryRegistry implements Registry {
             List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
             serviceListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
         });
-        listener.notify(serviceListener.getUrls(url.getProtocolServiceKey()));
+        listener.notify(serviceListener.getUrls(url.getServiceKey() + GROUP_CHAR_SEPERATOR + url.getParameter(PROTOCOL_KEY, DUBBO)));
 
         serviceListener.addListener(url.getProtocolServiceKey(), listener);
         registerServiceInstancesChangedListener(url, serviceListener);
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
index 3fd7713..e01a36e 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
@@ -45,7 +45,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
     private static final Logger logger = LoggerFactory.getLogger(ServiceDiscoveryRegistryDirectory.class);
 
     // Map<url, Invoker> cache service url to invoker mapping.
-    private volatile Map<URL, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
+    private volatile Map<String, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
 
     private ServiceInstancesChangedListener listener;
 
@@ -75,11 +75,12 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
             destroyAllInvokers(); // Close all invokers
         } else {
             this.forbidden = false; // Allow to access
-            Map<URL, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
+            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
             if (CollectionUtils.isEmpty(invokerUrls)) {
                 return;
             }
-            Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
+
+            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
 
             if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
                 logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size() + ")"));
@@ -109,51 +110,63 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
      * @param urls
      * @return invokers
      */
-    private Map<URL, Invoker<T>> toInvokers(List<URL> urls) {
-        Map<URL, Invoker<T>> newUrlInvokerMap = new HashMap<>();
+    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
+        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
         if (urls == null || urls.isEmpty()) {
             return newUrlInvokerMap;
         }
         for (URL url : urls) {
-            if (EMPTY_PROTOCOL.equals(url.getProtocol())) {
+            InstanceAddressURL instanceAddressURL = (InstanceAddressURL) url;
+            if (EMPTY_PROTOCOL.equals(instanceAddressURL.getProtocol())) {
                 continue;
             }
-            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(url.getProtocol())) {
-                logger.error(new IllegalStateException("Unsupported protocol " + url.getProtocol() +
-                        " in notified url: " + url + " from registry " + getUrl().getAddress() +
+            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(instanceAddressURL.getProtocol())) {
+                logger.error(new IllegalStateException("Unsupported protocol " + instanceAddressURL.getProtocol() +
+                        " in notified url: " + instanceAddressURL + " from registry " + getUrl().getAddress() +
                         " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
                         ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                 continue;
             }
 
-            if (urlInvokerMap != null && urlInvokerMap.containsKey(url)) { // Repeated url
-                continue;
-            }
-            Invoker<T> invoker = urlInvokerMap == null ? null : urlInvokerMap.get(url);
-            if (invoker == null) { // Not in the cache, refer again
+            // FIXME, some keys may need to be removed.
+            instanceAddressURL.addConsumerParams(queryMap);
+
+            Invoker<T> invoker = urlInvokerMap == null ? null : urlInvokerMap.get(instanceAddressURL.getAddress());
+            if (invoker == null || urlChanged(invoker, instanceAddressURL)) { // Not in the cache, refer again
                 try {
                     boolean enabled = true;
-                    if (url.hasParameter(DISABLED_KEY)) {
-                        enabled = !url.getParameter(DISABLED_KEY, false);
+                    if (instanceAddressURL.hasParameter(DISABLED_KEY)) {
+                        enabled = !instanceAddressURL.getParameter(DISABLED_KEY, false);
                     } else {
-                        enabled = url.getParameter(ENABLED_KEY, true);
+                        enabled = instanceAddressURL.getParameter(ENABLED_KEY, true);
                     }
                     if (enabled) {
-                        invoker = protocol.refer(serviceType, url);
+                        invoker = protocol.refer(serviceType, instanceAddressURL);
                     }
                 } catch (Throwable t) {
-                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
+                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + instanceAddressURL + ")" + t.getMessage(), t);
                 }
                 if (invoker != null) { // Put new invoker in cache
-                    newUrlInvokerMap.put(url, invoker);
+                    newUrlInvokerMap.put(instanceAddressURL.getAddress(), invoker);
                 }
             } else {
-                newUrlInvokerMap.put(url, invoker);
+                newUrlInvokerMap.put(instanceAddressURL.getAddress(), invoker);
             }
         }
         return newUrlInvokerMap;
     }
 
+    private boolean urlChanged(Invoker<T> invoker, InstanceAddressURL newURL) {
+        InstanceAddressURL oldURL = (InstanceAddressURL) invoker.getUrl();
+
+        if (!newURL.getInstance().equals(oldURL.getInstance())) {
+            return true;
+        }
+
+        return !oldURL.getMetadataInfo().getServiceInfo(getConsumerUrl().getProtocolServiceKey())
+                .equals(newURL.getMetadataInfo().getServiceInfo(getConsumerUrl().getProtocolServiceKey()));
+    }
+
     private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) {
         return invokers;
     }
@@ -162,7 +175,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
      * Close all invokers
      */
     private void destroyAllInvokers() {
-        Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
+        Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
         if (localUrlInvokerMap != null) {
             for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) {
                 try {
@@ -183,16 +196,16 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
      * @param oldUrlInvokerMap
      * @param newUrlInvokerMap
      */
-    private void destroyUnusedInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, Map<URL, Invoker<T>> newUrlInvokerMap) {
+    private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
         if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
             destroyAllInvokers();
             return;
         }
         // check deleted invoker
-        List<URL> deleted = null;
+        List<String> deleted = null;
         if (oldUrlInvokerMap != null) {
             Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
-            for (Map.Entry<URL, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
+            for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
                 if (!newInvokers.contains(entry.getValue())) {
                     if (deleted == null) {
                         deleted = new ArrayList<>();
@@ -203,9 +216,9 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
         }
 
         if (deleted != null) {
-            for (URL url : deleted) {
-                if (url != null) {
-                    Invoker<T> invoker = oldUrlInvokerMap.remove(url);
+            for (String addressKey : deleted) {
+                if (addressKey != null) {
+                    Invoker<T> invoker = oldUrlInvokerMap.remove(addressKey);
                     if (invoker != null) {
                         try {
                             invoker.destroy();
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java
index 57010cf..1afb486 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java
@@ -79,12 +79,7 @@ public class ServiceInstanceMetadataUtils {
     /**
      * The property name of The revision for all exported Dubbo services.
      */
-    public static String EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.exported-services.revision";
-
-    /**
-     * The property name of The revision for all subscribed Dubbo services.
-     */
-    public static String SUBSCRIBER_SERVICES_REVISION_PROPERTY_NAME = "dubbo.subscribed-services.revision";
+    public static String EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.metadata.revision";
 
     /**
      * The property name of metadata storage type.
@@ -163,17 +158,6 @@ public class ServiceInstanceMetadataUtils {
     }
 
     /**
-     * The revision for all subscribed Dubbo services from the specified {@link ServiceInstance}.
-     *
-     * @param serviceInstance the specified {@link ServiceInstance}
-     * @return <code>null</code> if not exits
-     */
-    public static String getSubscribedServicesRevision(ServiceInstance serviceInstance) {
-        Map<String, String> metadata = serviceInstance.getMetadata();
-        return metadata.get(SUBSCRIBER_SERVICES_REVISION_PROPERTY_NAME);
-    }
-
-    /**
      * Get metadata's storage type
      *
      * @param registryURL the {@link URL} to connect the registry
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
index 8268361..80aadc8 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
@@ -43,10 +43,14 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
 import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
 import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY;
+import static org.apache.dubbo.registry.Constants.REGISTER_IP_KEY;
 import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
 import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY;
 import static org.apache.dubbo.registry.integration.RegistryProtocol.DEFAULT_REGISTER_CONSUMER_KEYS;
@@ -122,7 +126,10 @@ public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implement
 
     private URL turnRegistryUrlToConsumerUrl(URL url) {
         return URLBuilder.from(url)
-                .setPath(url.getServiceInterface())
+                .setHost(queryMap.get(REGISTER_IP_KEY))
+                .setPort(0)
+                .setProtocol(queryMap.get(PROTOCOL_KEY) == null ? DUBBO : queryMap.get(PROTOCOL_KEY))
+                .setPath(queryMap.get(INTERFACE_KEY))
                 .clearParameters()
                 .addParameters(queryMap)
                 .removeParameter(MONITOR_KEY)
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
index 7a06106..1062768 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
@@ -394,7 +394,7 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> implements NotifyL
         providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // Do not check whether the connection is successful or not, always create Invoker!
 
         // The combination of directoryUrl and override is at the end of notify, which can't be handled here
-        this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); // Merge the provider side parameters
+//        this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); // Merge the provider side parameters
 
         if ((providerUrl.getPath() == null || providerUrl.getPath()
                 .length() == 0) && DUBBO_PROTOCOL.equals(providerUrl.getProtocol())) { // Compatible version 1.0
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java
index 36bcc74..1e3b278 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java
@@ -105,7 +105,7 @@ public class Exchangers {
         if (handler == null) {
             throw new IllegalArgumentException("handler == null");
         }
-        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
+//        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
         return getExchanger(url).connect(url, handler);
     }
 
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java
index 94738a8..53ce72f 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java
@@ -51,7 +51,7 @@ public abstract class AbstractEndpoint extends AbstractPeer implements Resetable
     }
 
     protected static Codec2 getChannelCodec(URL url) {
-        String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
+        String codecName = url.getProtocol(); // codec extension name must stay the same with protocol name
         if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
             return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
         } else {
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
index b5c9e67..0eae664 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
@@ -67,6 +67,7 @@ public class InvokerInvocationHandler implements InvocationHandler {
         String serviceKey = invoker.getUrl().getServiceKey();
         rpcInvocation.setTargetServiceUniqueName(serviceKey);
 
+        // invoker.getUrl() returns consumer url.
         RpcContext.setRpcContext(invoker.getUrl());
 
         if (consumerModel != null) {
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index 83190d2..e095523 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -572,9 +572,10 @@ public class DubboProtocol extends AbstractProtocol {
         // client type setting.
         String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
 
-        url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
+//        url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
         // enable heartbeat by default
-        url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
+        // FIXME,
+//        url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
 
         // BIO is not allowed since it has severe performance issue.
         if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
diff --git a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java
index 1737d54..2758c5e 100644
--- a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java
@@ -191,7 +191,7 @@ public class ThriftProtocol extends AbstractProtocol {
 
         ExchangeClient client;
 
-        url = url.addParameter(CODEC_KEY, ThriftCodec.NAME);
+//        url = url.addParameter(CODEC_KEY, ThriftCodec.NAME);
 
         try {
             client = Exchangers.connect(url);


[dubbo] 01/04: metadata report status

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit 838dfd6eda89e15887244f8ac69ffe6fb6123170
Author: ken.lj <ke...@gmail.com>
AuthorDate: Tue Jul 7 11:43:17 2020 +0800

    metadata report status
---
 .../org/apache/dubbo/metadata/MetadataInfo.java    | 35 ++++++++++++++++------
 .../metadata/store/RemoteMetadataServiceImpl.java  | 16 +++++-----
 2 files changed, 35 insertions(+), 16 deletions(-)

diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index 932517d..73e736f 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.dubbo.common.constants.CommonConstants.DOT_SEPARATOR;
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPERATOR;
@@ -43,6 +44,7 @@ public class MetadataInfo implements Serializable {
     private Map<String, ServiceInfo> services;
 
     private transient Map<String, String> extendParams;
+    private transient AtomicBoolean reported = new AtomicBoolean(false);
 
     public MetadataInfo(String app) {
         this(app, null, null);
@@ -60,6 +62,7 @@ public class MetadataInfo implements Serializable {
             return;
         }
         this.services.put(serviceInfo.getMatchKey(), serviceInfo);
+        markChanged();
     }
 
     public void removeService(ServiceInfo serviceInfo) {
@@ -67,6 +70,7 @@ public class MetadataInfo implements Serializable {
             return;
         }
         this.services.remove(serviceInfo.getMatchKey());
+        markChanged();
     }
 
     public void removeService(String key) {
@@ -74,18 +78,11 @@ public class MetadataInfo implements Serializable {
             return;
         }
         this.services.remove(key);
-    }
-
-    public String getApp() {
-        return app;
-    }
-
-    public void setApp(String app) {
-        this.app = app;
+        markChanged();
     }
 
     public String getRevision() {
-        if (revision != null) {
+        if (revision != null && hasReported()) {
             return revision;
         }
         StringBuilder sb = new StringBuilder();
@@ -101,6 +98,26 @@ public class MetadataInfo implements Serializable {
         this.revision = revision;
     }
 
+    public boolean hasReported() {
+        return reported.get();
+    }
+
+    public void markReported() {
+        reported.compareAndSet(false, true);
+    }
+
+    public void markChanged() {
+        reported.compareAndSet(true, false);
+    }
+
+    public String getApp() {
+        return app;
+    }
+
+    public void setApp(String app) {
+        this.app = app;
+    }
+
     public Map<String, ServiceInfo> getServices() {
         return services;
     }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
index 85d6855..ee41050 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
@@ -62,14 +62,16 @@ public class RemoteMetadataServiceImpl {
     public void publishMetadata(ServiceInstance instance) {
         Map<String, MetadataInfo> metadataInfos = localMetadataService.getMetadataInfos();
         metadataInfos.forEach((registryKey, metadataInfo) -> {
-            SubscriberMetadataIdentifier identifier = new SubscriberMetadataIdentifier(instance.getServiceName(), metadataInfo.getRevision());
-            metadataInfo.getRevision();
-            metadataInfo.getExtendParams().put(REGISTRY_KEY, registryKey);
-            MetadataReport metadataReport = getMetadataReports().get(registryKey);
-            if (metadataReport == null) {
-                metadataReport = getMetadataReports().entrySet().iterator().next().getValue();
+            if (!metadataInfo.hasReported()) {
+                SubscriberMetadataIdentifier identifier = new SubscriberMetadataIdentifier(instance.getServiceName(), metadataInfo.getRevision());
+                metadataInfo.getRevision();
+                metadataInfo.getExtendParams().put(REGISTRY_KEY, registryKey);
+                MetadataReport metadataReport = getMetadataReports().get(registryKey);
+                if (metadataReport == null) {
+                    metadataReport = getMetadataReports().entrySet().iterator().next().getValue();
+                }
+                metadataReport.publishAppMetadata(identifier, metadataInfo);
             }
-            metadataReport.publishAppMetadata(identifier, metadataInfo);
         });
     }