You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/03/24 07:25:05 UTC

[dubbo] 04/12: [3.0] fix service discovery implementation and introduce ClusterFiIlter (#7388)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit 70a92457159e1b27bb7ee367118b7e32e20eb05c
Author: ken.lj <ke...@gmail.com>
AuthorDate: Wed Mar 17 00:03:09 2021 +0800

    [3.0] fix service discovery implementation and introduce ClusterFiIlter (#7388)
---
 .../org/apache/dubbo/rpc/cluster/Directory.java    |   4 +
 .../dubbo/rpc/cluster/filter/ClusterFilter.java    |  24 +++
 .../cluster/filter/DefaultFilterChainBuilder.java  |  10 +-
 .../rpc/cluster/filter/FilterChainBuilder.java     |  36 +++-
 .../rpc/cluster/filter/ProtocolFilterWrapper.java  |   9 +-
 .../filter/support}/ConsumerContextFilter.java     |  26 ++-
 .../support/ZoneAwareFilter.java}                  |  19 ++-
 .../cluster/interceptor/ClusterInterceptor.java    |   1 +
 .../ConsumerContextClusterInterceptor.java         |  60 -------
 .../cluster/support/wrapper/AbstractCluster.java   | 190 +++++++++++----------
 ...g.apache.dubbo.rpc.cluster.filter.ClusterFilter |   2 +
 ...ubbo.rpc.cluster.interceptor.ClusterInterceptor |   2 -
 dubbo-common/pom.xml                               |   4 +
 .../java/org/apache/dubbo/common/URLStrParser.java |   9 +-
 .../apache/dubbo/common/config/Environment.java    |  20 +++
 .../dubbo/common/constants/CommonConstants.java    |   6 +
 .../dubbo/common/url/component/URLAddress.java     |   2 +-
 .../dubbo/common/url/component/URLItemCache.java   |  44 +++--
 .../dubbo/common/url/component/URLParam.java       |   4 +-
 .../org/apache/dubbo/common/utils/ConfigUtils.java |  47 +++++
 .../org/apache/dubbo/config/AbstractConfig.java    |   3 +-
 .../java/org/apache/dubbo/config/Constants.java    |   2 +
 .../dubbo/config/bootstrap/DubboBootstrap.java     |   3 +-
 .../ServiceInstanceHostPortCustomizer.java         |   5 +-
 .../apache/dubbo/config/AbstractConfigTest.java    |   4 +-
 .../src/main/resources/dubbo-migration.yaml        |   3 +
 dubbo-dependencies-bom/pom.xml                     |   6 +
 dubbo-distribution/dubbo-all/pom.xml               |   8 +
 .../org/apache/dubbo/metadata/MetadataInfo.java    |   4 +
 .../dubbo/monitor/support/MonitorFilter.java       |   3 +-
 .../dubbo/auth/filter/ConsumerSignFilter.java      |   2 +-
 .../org/apache/dubbo/registry/NotifyListener.java  |   4 +
 .../registry/client/DefaultServiceInstance.java    |  58 +++----
 .../client/EventPublishingServiceDiscovery.java    |  15 ++
 .../client/FileSystemServiceDiscovery.java         |   2 +-
 .../client/SelfHostMetaServiceDiscovery.java       |   2 +-
 .../dubbo/registry/client/ServiceDiscovery.java    |   5 +
 .../registry/client/ServiceDiscoveryRegistry.java  |   2 +-
 .../client/ServiceDiscoveryRegistryDirectory.java  |   5 +-
 .../dubbo/registry/client/ServiceInstance.java     |  13 +-
 .../listener/ServiceInstancesChangedListener.java  | 117 +++++++------
 .../registry/client/metadata/MetadataUtils.java    |   2 +-
 .../metadata/store/RemoteMetadataServiceImpl.java  |   2 +-
 .../DefaultMigrationAddressComparator.java         |   4 +-
 .../client/migration/MigrationInvoker.java         |  89 +++++-----
 .../client/migration/MigrationRuleHandler.java     |   4 +-
 .../client/migration/MigrationRuleListener.java    |   8 +-
 .../registry/integration/DynamicDirectory.java     |   8 +-
 .../client/DefaultServiceInstanceTest.java         |   3 +-
 .../client/FileSystemServiceDiscoveryTest.java     |   2 +
 .../multiple/MultipleServiceDiscovery.java         |  38 +++--
 .../nacos/util/NacosNamingServiceUtils.java        |   4 +-
 .../zookeeper/ZookeeperServiceDiscovery.java       |  34 ++--
 .../ZookeeperServiceDiscoveryChangeWatcher.java    |  38 ++++-
 .../zookeeper/util/CuratorFrameworkUtils.java      |   2 +-
 .../zookeeper/ZookeeperServiceDiscoveryTest.java   |   3 +-
 .../main/java/org/apache/dubbo/rpc/BaseFilter.java |  31 ++++
 .../src/main/java/org/apache/dubbo/rpc/Filter.java |  40 +++--
 .../apache/dubbo/rpc/protocol/AbstractInvoker.java |   2 +-
 .../dubbo/internal/org.apache.dubbo.rpc.Filter     |   2 -
 .../rpc/protocol/dubbo/filter/FutureFilter.java    |   4 +-
 .../dubbo/internal/org.apache.dubbo.rpc.Filter     |   3 +-
 ...g.apache.dubbo.rpc.cluster.filter.ClusterFilter |   1 +
 .../dubbo/rpc/protocol/dubbo/FutureFilterTest.java |   4 +-
 64 files changed, 688 insertions(+), 425 deletions(-)

diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java
index 9fd3ca2..5a92d97 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java
@@ -65,4 +65,8 @@ public interface Directory<T> extends Node {
     void discordAddresses();
 
     RouterChain<T> getRouterChain();
+
+    default boolean isNotificationReceived() {
+        return false;
+    }
 }
\ No newline at end of file
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ClusterFilter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ClusterFilter.java
new file mode 100644
index 0000000..7d48dc9
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ClusterFilter.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.filter;
+
+import org.apache.dubbo.common.extension.SPI;
+import org.apache.dubbo.rpc.BaseFilter;
+
+@SPI
+public interface ClusterFilter extends BaseFilter {
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java
index 982008b..e2e26d2 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java
@@ -27,6 +27,9 @@ import java.util.List;
 @Activate(order = 0)
 public class DefaultFilterChainBuilder implements FilterChainBuilder {
 
+    /**
+     * build consumer/provider filter chain
+     */
     @Override
     public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {
         Invoker<T> last = originalInvoker;
@@ -43,14 +46,17 @@ public class DefaultFilterChainBuilder implements FilterChainBuilder {
         return last;
     }
 
+    /**
+     * build consumer cluster filter chain
+     */
     @Override
     public <T> ClusterInvoker<T> buildClusterInvokerChain(final ClusterInvoker<T> originalInvoker, String key, String group) {
         ClusterInvoker<T> last = originalInvoker;
-        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(originalInvoker.getUrl(), key, group);
+        List<ClusterFilter> filters = ExtensionLoader.getExtensionLoader(ClusterFilter.class).getActivateExtension(originalInvoker.getUrl(), key, group);
 
         if (!filters.isEmpty()) {
             for (int i = filters.size() - 1; i >= 0; i--) {
-                final Filter filter = filters.get(i);
+                final ClusterFilter filter = filters.get(i);
                 final Invoker<T> next = last;
                 last = new ClusterFilterChainNode<>(originalInvoker, next, filter);
             }
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java
index 20275e2..949a66c 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.rpc.cluster.filter;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.extension.SPI;
+import org.apache.dubbo.rpc.BaseFilter;
 import org.apache.dubbo.rpc.Filter;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
@@ -29,16 +30,27 @@ import org.apache.dubbo.rpc.cluster.Directory;
 
 @SPI("default")
 public interface FilterChainBuilder {
+    /**
+     * build consumer/provider filter chain
+     */
     <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group);
 
+    /**
+     * build consumer cluster filter chain
+     */
     <T> ClusterInvoker<T> buildClusterInvokerChain(final ClusterInvoker<T> invoker, String key, String group);
 
-    class FilterChainNode<T, TYPE extends Invoker<T>> implements Invoker<T>{
+    /**
+     * Works on provider side
+     * @param <T>
+     * @param <TYPE>
+     */
+    class FilterChainNode<T, TYPE extends Invoker<T>, FILTER extends BaseFilter> implements Invoker<T>{
         TYPE originalInvoker;
         Invoker<T> nextNode;
-        Filter filter;
+        FILTER filter;
 
-        public FilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, Filter filter) {
+        public FilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
             this.originalInvoker = originalInvoker;
             this.nextNode = nextNode;
             this.filter = filter;
@@ -79,8 +91,8 @@ public interface FilterChainBuilder {
                     } finally {
                         listenableFilter.removeListener(invocation);
                     }
-                } else if (filter instanceof Filter.Listener) {
-                    Filter.Listener listener = (Filter.Listener) filter;
+                } else if (filter instanceof FILTER.Listener) {
+                    FILTER.Listener listener = (FILTER.Listener) filter;
                     listener.onError(e, originalInvoker, invocation);
                 }
                 throw e;
@@ -102,8 +114,8 @@ public interface FilterChainBuilder {
                     } finally {
                         listenableFilter.removeListener(invocation);
                     }
-                } else if (filter instanceof Filter.Listener) {
-                    Filter.Listener listener = (Filter.Listener) filter;
+                } else if (filter instanceof FILTER.Listener) {
+                    FILTER.Listener listener = (FILTER.Listener) filter;
                     if (t == null) {
                         listener.onResponse(r, originalInvoker, invocation);
                     } else {
@@ -124,8 +136,14 @@ public interface FilterChainBuilder {
         }
     }
 
-    class ClusterFilterChainNode<T, TYPE extends ClusterInvoker<T>> extends FilterChainNode<T, TYPE> implements ClusterInvoker<T> {
-        public ClusterFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, Filter filter) {
+    /**
+     * Works on consumer side
+     * @param <T>
+     * @param <TYPE>
+     */
+    class ClusterFilterChainNode<T, TYPE extends ClusterInvoker<T>, FILTER extends BaseFilter>
+            extends FilterChainNode<T, TYPE, FILTER> implements ClusterInvoker<T> {
+        public ClusterFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
             super(originalInvoker, nextNode, filter);
         }
 
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ProtocolFilterWrapper.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ProtocolFilterWrapper.java
index 65e8813..389b173 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ProtocolFilterWrapper.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ProtocolFilterWrapper.java
@@ -28,10 +28,9 @@ import org.apache.dubbo.rpc.ProtocolServer;
 import org.apache.dubbo.rpc.RpcException;
 
 import java.util.List;
-import java.util.Objects;
 
+import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.SERVICE_FILTER_KEY;
-import static org.apache.dubbo.rpc.cluster.Constants.PEER_KEY;
 
 /**
  * ListenerProtocol
@@ -68,11 +67,7 @@ public class ProtocolFilterWrapper implements Protocol {
         if (UrlUtils.isRegistry(url)) {
             return protocol.refer(type, url);
         }
-        // if it's peer-to-peer url
-        if (!Objects.isNull(url.getAttribute(PEER_KEY))) {
-            return builder.buildInvokerChain(protocol.refer(type, url), SERVICE_FILTER_KEY, CommonConstants.CONSUMER);
-        }
-        return protocol.refer(type, url);
+        return builder.buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
     }
 
     @Override
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
similarity index 83%
rename from dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java
rename to dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
index 0fead6b..4a1ed3b 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.rpc.filter;
+package org.apache.dubbo.rpc.cluster.filter.support;
 
 import org.apache.dubbo.common.extension.Activate;
 import org.apache.dubbo.common.utils.CollectionUtils;
@@ -28,6 +28,7 @@ import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.TimeoutCountDown;
+import org.apache.dubbo.rpc.cluster.filter.ClusterFilter;
 
 import java.util.Map;
 
@@ -39,11 +40,11 @@ import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_K
  * ConsumerContextFilter set current RpcContext with invoker,invocation, local host, remote host and port
  * for consumer invoker.It does it to make the requires info available to execution thread's RpcContext.
  *
- * @see org.apache.dubbo.rpc.Filter
+ * @see Filter
  * @see RpcContext
  */
 @Activate(group = CONSUMER, order = -10000)
-public class ConsumerContextFilter implements Filter {
+public class ConsumerContextFilter implements ClusterFilter, ClusterFilter.Listener {
 
     @Override
     public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
@@ -76,7 +77,24 @@ public class ConsumerContextFilter implements Filter {
                                 + invocation.getMethodName() + ", terminate directly."), invocation);
             }
         }
-        return invoker.invoke(invocation);
+
+        try {
+            RpcContext.removeServerContext();
+            return invoker.invoke(invocation);
+        } finally {
+            RpcContext.removeContext();
+        }
+    }
+
+    @Override
+    public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
+        // pass attachments to result
+        RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());
+    }
+
+    @Override
+    public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
+
     }
 
 }
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ZoneAwareClusterInterceptor.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ZoneAwareFilter.java
similarity index 80%
rename from dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ZoneAwareClusterInterceptor.java
rename to dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ZoneAwareFilter.java
index 6daec08..cd0a7ab 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ZoneAwareClusterInterceptor.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ZoneAwareFilter.java
@@ -14,15 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.rpc.cluster.interceptor;
+package org.apache.dubbo.rpc.cluster.filter.support;
 
+import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.extension.Activate;
 import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.ZoneDetector;
-import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;
+import org.apache.dubbo.rpc.cluster.filter.ClusterFilter;
 
 import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE;
 import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE_FORCE;
@@ -32,11 +36,11 @@ import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE_
  *
  * active only when url has key 'cluster=zone-aware'
  */
-@Activate(value = "cluster:zone-aware")
-public class ZoneAwareClusterInterceptor implements ClusterInterceptor {
+@Activate(group = CommonConstants.CONSUMER, value = "cluster:zone-aware")
+public class ZoneAwareFilter implements ClusterFilter {
 
     @Override
-    public void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
+    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
         RpcContext rpcContext = RpcContext.getContext();
         String zone = (String) rpcContext.getAttachment(REGISTRY_ZONE);
         String force = (String) rpcContext.getAttachment(REGISTRY_ZONE_FORCE);
@@ -53,10 +57,7 @@ public class ZoneAwareClusterInterceptor implements ClusterInterceptor {
         if (StringUtils.isNotEmpty(force)) {
             invocation.setAttachment(REGISTRY_ZONE_FORCE, force);
         }
-    }
-
-    @Override
-    public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
 
+        return invoker.invoke(invocation);
     }
 }
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java
index 199361f..821dd2e 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java
@@ -26,6 +26,7 @@ import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;
 /**
  * Different from {@link Filter}, ClusterInterceptor works at the outmost layer, before one specific address/invoker is picked.
  */
+@Deprecated
 @SPI
 public interface ClusterInterceptor {
 
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java
deleted file mode 100644
index 053bc87..0000000
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.cluster.interceptor;
-
-import org.apache.dubbo.common.extension.Activate;
-import org.apache.dubbo.common.utils.NetUtils;
-import org.apache.dubbo.rpc.Invocation;
-import org.apache.dubbo.rpc.Result;
-import org.apache.dubbo.rpc.RpcContext;
-import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;
-
-@Activate
-public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener {
-
-    @Override
-    public void before(AbstractClusterInvoker<?> invoker, Invocation invocation) {
-        RpcContext context = RpcContext.getContext();
-        context.setInvocation(invocation).setLocalAddress(NetUtils.getLocalHost(), 0);
-        if (invocation instanceof RpcInvocation) {
-            ((RpcInvocation) invocation).setInvoker(invoker);
-        }
-        RpcContext.removeServerContext();
-    }
-
-    @Override
-    public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
-        RpcContext.removeContext(true);
-    }
-
-    @Override
-    public Result intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) throws RpcException {
-        return clusterInvoker.invoke(invocation);
-    }
-
-    @Override
-    public void onMessage(Result appResponse, AbstractClusterInvoker<?> invoker, Invocation invocation) {
-        RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());
-    }
-
-    @Override
-    public void onError(Throwable t, AbstractClusterInvoker<?> invoker, Invocation invocation) {
-
-    }
-}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java
index 9ce920d..1c2d047 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.rpc.cluster.support.wrapper;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.utils.CollectionUtils;
@@ -36,6 +37,7 @@ import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;
 
 import java.util.List;
 
+import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_INTERCEPTOR_COMPATIBLE_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.INVOCATION_INTERCEPTOR_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_INTERCEPTOR_KEY;
@@ -44,15 +46,10 @@ public abstract class AbstractCluster implements Cluster {
 
     private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) {
 //        AbstractClusterInvoker<T> last = clusterInvoker;
-        AbstractClusterInvoker<T> last = buildInterceptorInvoker(new FilterInvoker<>(clusterInvoker));
-        List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtensions();
+        AbstractClusterInvoker<T> last = buildInterceptorInvoker(new ClusterFilterInvoker<>(clusterInvoker));
 
-        if (!interceptors.isEmpty()) {
-            for (int i = interceptors.size() - 1; i >= 0; i--) {
-                final ClusterInterceptor interceptor = interceptors.get(i);
-                final AbstractClusterInvoker<T> next = last;
-                last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next);
-            }
+        if (Boolean.parseBoolean(ConfigurationUtils.getProperty(CLUSTER_INTERCEPTOR_COMPATIBLE_KEY, "false"))) {
+            return build27xCompatibleClusterInterceptors(clusterInvoker, last);
         }
         return last;
     }
@@ -70,90 +67,15 @@ public abstract class AbstractCluster implements Cluster {
         if (CollectionUtils.isEmpty(builders)) {
             return invoker;
         }
-        return new InterceptorInvoker<>(invoker, builders);
+        return new InvocationInterceptorInvoker<>(invoker, builders);
     }
 
     protected abstract <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException;
 
-    static class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> {
-
-        private AbstractClusterInvoker<T> clusterInvoker;
-        private ClusterInterceptor interceptor;
-        private AbstractClusterInvoker<T> next;
-
-        public InterceptorInvokerNode(AbstractClusterInvoker<T> clusterInvoker,
-                                      ClusterInterceptor interceptor,
-                                      AbstractClusterInvoker<T> next) {
-            this.clusterInvoker = clusterInvoker;
-            this.interceptor = interceptor;
-            this.next = next;
-        }
-
-        @Override
-        public Class<T> getInterface() {
-            return clusterInvoker.getInterface();
-        }
-
-        @Override
-        public URL getUrl() {
-            return clusterInvoker.getUrl();
-        }
-
-        @Override
-        public boolean isAvailable() {
-            return clusterInvoker.isAvailable();
-        }
-
-        @Override
-        public Result invoke(Invocation invocation) throws RpcException {
-            Result asyncResult;
-            try {
-                interceptor.before(next, invocation);
-                asyncResult = interceptor.intercept(next, invocation);
-            } catch (Exception e) {
-                // onError callback
-                if (interceptor instanceof ClusterInterceptor.Listener) {
-                    ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
-                    listener.onError(e, clusterInvoker, invocation);
-                }
-                throw e;
-            } finally {
-                interceptor.after(next, invocation);
-            }
-            return asyncResult.whenCompleteWithContext((r, t) -> {
-                // onResponse callback
-                if (interceptor instanceof ClusterInterceptor.Listener) {
-                    ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
-                    if (t == null) {
-                        listener.onMessage(r, clusterInvoker, invocation);
-                    } else {
-                        listener.onError(t, clusterInvoker, invocation);
-                    }
-                }
-            });
-        }
-
-        @Override
-        public void destroy() {
-            clusterInvoker.destroy();
-        }
-
-        @Override
-        public String toString() {
-            return clusterInvoker.toString();
-        }
-
-        @Override
-        protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
-            // The only purpose is to build a interceptor chain, so the cluster related logic doesn't matter.
-            return null;
-        }
-    }
-
-    static class FilterInvoker<T> extends AbstractClusterInvoker<T> {
+    static class ClusterFilterInvoker<T> extends AbstractClusterInvoker<T> {
         private ClusterInvoker<T> filterInvoker;
 
-        public FilterInvoker(AbstractClusterInvoker<T> invoker) {
+        public ClusterFilterInvoker(AbstractClusterInvoker<T> invoker) {
             List<FilterChainBuilder> builders = ExtensionLoader.getExtensionLoader(FilterChainBuilder.class).getActivateExtensions();
             if (CollectionUtils.isEmpty(builders)) {
                 filterInvoker = invoker;
@@ -203,10 +125,10 @@ public abstract class AbstractCluster implements Cluster {
         }
     }
 
-    static class InterceptorInvoker<T> extends AbstractClusterInvoker<T> {
+    static class InvocationInterceptorInvoker<T> extends AbstractClusterInvoker<T> {
         private ClusterInvoker<T> interceptorInvoker;
 
-        public InterceptorInvoker(AbstractClusterInvoker<T> invoker, List<InvocationInterceptorBuilder> builders) {
+        public InvocationInterceptorInvoker(AbstractClusterInvoker<T> invoker, List<InvocationInterceptorBuilder> builders) {
             ClusterInvoker<T> tmpInvoker = invoker;
             for (InvocationInterceptorBuilder builder : builders) {
                 tmpInvoker = builder.buildClusterInterceptorChain(tmpInvoker, INVOCATION_INTERCEPTOR_KEY, CommonConstants.CONSUMER);
@@ -248,4 +170,96 @@ public abstract class AbstractCluster implements Cluster {
             return null;
         }
     }
+
+    @Deprecated
+    private <T> ClusterInvoker<T> build27xCompatibleClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, AbstractClusterInvoker<T> last) {
+        List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtensions();
+
+        if (!interceptors.isEmpty()) {
+            for (int i = interceptors.size() - 1; i >= 0; i--) {
+                final ClusterInterceptor interceptor = interceptors.get(i);
+                final AbstractClusterInvoker<T> next = last;
+                last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next);
+            }
+        }
+        return last;
+    }
+
+    @Deprecated
+    static class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> {
+
+        private AbstractClusterInvoker<T> clusterInvoker;
+        private ClusterInterceptor interceptor;
+        private AbstractClusterInvoker<T> next;
+
+        public InterceptorInvokerNode(AbstractClusterInvoker<T> clusterInvoker,
+                                      ClusterInterceptor interceptor,
+                                      AbstractClusterInvoker<T> next) {
+            this.clusterInvoker = clusterInvoker;
+            this.interceptor = interceptor;
+            this.next = next;
+        }
+
+        @Override
+        public Class<T> getInterface() {
+            return clusterInvoker.getInterface();
+        }
+
+        @Override
+        public URL getUrl() {
+            return clusterInvoker.getUrl();
+        }
+
+        @Override
+        public boolean isAvailable() {
+            return clusterInvoker.isAvailable();
+        }
+
+        @Override
+        public Result invoke(Invocation invocation) throws RpcException {
+            Result asyncResult;
+            try {
+                interceptor.before(next, invocation);
+                asyncResult = interceptor.intercept(next, invocation);
+            } catch (Exception e) {
+                // onError callback
+                if (interceptor instanceof ClusterInterceptor.Listener) {
+                    ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
+                    listener.onError(e, clusterInvoker, invocation);
+                }
+                throw e;
+            } finally {
+                interceptor.after(next, invocation);
+            }
+            return asyncResult.whenCompleteWithContext((r, t) -> {
+                // onResponse callback
+                if (interceptor instanceof ClusterInterceptor.Listener) {
+                    ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
+                    if (t == null) {
+                        listener.onMessage(r, clusterInvoker, invocation);
+                    } else {
+                        listener.onError(t, clusterInvoker, invocation);
+                    }
+                }
+            });
+        }
+
+        @Override
+        public void destroy() {
+            clusterInvoker.destroy();
+        }
+
+        @Override
+        public String toString() {
+            return clusterInvoker.toString();
+        }
+
+        @Override
+        protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
+            // The only purpose is to build a interceptor chain, so the cluster related logic doesn't matter.
+            return null;
+        }
+    }
+
+
 }
diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter
new file mode 100644
index 0000000..8f70d31
--- /dev/null
+++ b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter
@@ -0,0 +1,2 @@
+zone-aware=org.apache.dubbo.rpc.cluster.filter.support.ZoneAwareFilter
+consumercontext=org.apache.dubbo.rpc.cluster.filter.support.ConsumerContextFilter
\ No newline at end of file
diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.interceptor.ClusterInterceptor b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.interceptor.ClusterInterceptor
deleted file mode 100644
index 3f3f008..0000000
--- a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.interceptor.ClusterInterceptor
+++ /dev/null
@@ -1,2 +0,0 @@
-context=org.apache.dubbo.rpc.cluster.interceptor.ConsumerContextClusterInterceptor
-zone-aware=org.apache.dubbo.rpc.cluster.interceptor.ZoneAwareClusterInterceptor
\ No newline at end of file
diff --git a/dubbo-common/pom.xml b/dubbo-common/pom.xml
index 44d9fab..b424661 100644
--- a/dubbo-common/pom.xml
+++ b/dubbo-common/pom.xml
@@ -72,6 +72,10 @@
             <groupId>javax.annotation</groupId>
             <artifactId>javax.annotation-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.eclipse.collections</groupId>
+            <artifactId>eclipse-collections</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java
index 619b1bb..5bd2970 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java
@@ -21,8 +21,9 @@ import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.url.component.ServiceConfigURL;
 import org.apache.dubbo.common.url.component.URLItemCache;
 
+import org.eclipse.collections.impl.map.mutable.UnifiedMap;
+
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY_PREFIX;
@@ -73,7 +74,7 @@ public final class URLStrParser {
         }
 
         TempBuf tempBuf = DECODE_TEMP_BUF.get();
-        Map<String, String> params = new HashMap<>();
+        Map<String, String> params = new UnifiedMap<>();
         int nameStart = from;
         int valueStart = -1;
         int i;
@@ -169,7 +170,7 @@ public final class URLStrParser {
         }
 
         // check cache
-        protocol = URLItemCache.checkProtocol(protocol);
+        protocol = URLItemCache.intern(protocol);
         path = URLItemCache.checkPath(path);
 
         return new ServiceConfigURL(protocol, username, password, host, port, path, parameters);
@@ -233,7 +234,7 @@ public final class URLStrParser {
         }
 
         TempBuf tempBuf = DECODE_TEMP_BUF.get();
-        Map<String, String> params = new HashMap<>();
+        Map<String, String> params = new UnifiedMap<>();
         int nameStart = from;
         int valueStart = -1;
         int i;
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java b/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java
index ea4b4e5..3da6cca 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java
@@ -17,11 +17,13 @@
 package org.apache.dubbo.common.config;
 
 import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
+import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.context.FrameworkExt;
 import org.apache.dubbo.common.context.LifecycleAdapter;
 import org.apache.dubbo.common.extension.DisableInject;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ConfigUtils;
 import org.apache.dubbo.config.AbstractConfig;
 import org.apache.dubbo.config.ConfigCenterConfig;
 import org.apache.dubbo.config.context.ConfigConfigurationAdapter;
@@ -54,6 +56,7 @@ public class Environment extends LifecycleAdapter implements FrameworkExt {
     private boolean configCenterFirst = true;
 
     private DynamicConfiguration dynamicConfiguration;
+    private String localMigrationRule;
 
     public Environment() {
         this.propertiesConfiguration = new PropertiesConfiguration();
@@ -76,6 +79,19 @@ public class Environment extends LifecycleAdapter implements FrameworkExt {
 
         this.externalConfiguration.setProperties(externalConfigurationMap);
         this.appExternalConfiguration.setProperties(appExternalConfigurationMap);
+
+        loadMigrationRule();
+    }
+
+    private void loadMigrationRule() {
+        String path = System.getProperty(CommonConstants.DUBBO_MIGRATION_KEY);
+        if (path == null || path.length() == 0) {
+            path = System.getenv(CommonConstants.DUBBO_MIGRATION_KEY);
+            if (path == null || path.length() == 0) {
+                path = CommonConstants.DEFAULT_DUBBO_MIGRATION_FILE;
+            }
+        }
+        this.localMigrationRule = ConfigUtils.loadMigrationRule(path);
     }
 
     @DisableInject
@@ -220,6 +236,10 @@ public class Environment extends LifecycleAdapter implements FrameworkExt {
         return appExternalConfiguration;
     }
 
+    public String getLocalMigrationRule() {
+        return localMigrationRule;
+    }
+
     // For test
     public void clearExternalConfigs() {
         this.externalConfiguration.clear();
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index 268c313..953b11b 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -45,6 +45,10 @@ public interface CommonConstants {
 
     String DEFAULT_DUBBO_PROPERTIES = "dubbo.properties";
 
+    String DUBBO_MIGRATION_KEY = "dubbo.migration.file";
+
+    String DEFAULT_DUBBO_MIGRATION_FILE = "dubbo-migration.yaml";
+
     String ANY_VALUE = "*";
 
     /**
@@ -376,4 +380,6 @@ public interface CommonConstants {
     String CACHE_CLEAR_TASK_INTERVAL = "dubbo.application.url.cache.task.interval";
     String CACHE_CLEAR_WAITING_THRESHOLD = "dubbo.application.url.cache.clear.waiting";
 
+    String CLUSTER_INTERCEPTOR_COMPATIBLE_KEY = "dubbo.application.cluster.interceptor.compatible";
+
 }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLAddress.java b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLAddress.java
index 63d140b..a0e89f1 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLAddress.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLAddress.java
@@ -252,7 +252,7 @@ public class URLAddress implements Serializable {
         }
 
         // check cache
-        protocol = URLItemCache.checkProtocol(protocol);
+        protocol = URLItemCache.intern(protocol);
         path = URLItemCache.checkPath(path);
 
         return new PathURLAddress(protocol, username, password, path, host, port, rawAddress);
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLItemCache.java b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLItemCache.java
index 2384493..54372c2 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLItemCache.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLItemCache.java
@@ -19,14 +19,13 @@ package org.apache.dubbo.common.url.component;
 import org.apache.dubbo.common.utils.LRUCache;
 
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 public class URLItemCache {
     // thread safe with limited size, by default 1000
     private static final Map<String, String> PARAM_KEY_CACHE = new LRUCache<>(10000);
-    private static final Map<String, String> PARAM_VALUE_CACHE = new LRUCache<>(100000);
+    private static final Map<String, String> PARAM_VALUE_CACHE = new LRUCache<>(50000);
     private static final Map<String, String> PATH_CACHE = new LRUCache<>(10000);
-    private static final Map<String, String> PROTOCOL_CACHE = new ConcurrentHashMap<>();
+    private static final Map<String, String> REVISION_CACHE = new LRUCache<>(10000);
 
     public static void putParams(Map<String, String> params, String key, String value) {
         String cachedKey = PARAM_KEY_CACHE.get(key);
@@ -43,17 +42,6 @@ public class URLItemCache {
         params.put(cachedKey, cachedValue);
     }
 
-    public static String checkProtocol(String _protocol) {
-        if (_protocol == null) {
-            return _protocol;
-        }
-        String cachedProtocol = PROTOCOL_CACHE.putIfAbsent(_protocol, _protocol);
-        if (cachedProtocol != null) {
-            return cachedProtocol;
-        }
-        return _protocol;
-    }
-
     public static String checkPath(String _path) {
         if (_path == null) {
             return _path;
@@ -64,4 +52,32 @@ public class URLItemCache {
         }
         return _path;
     }
+
+    public static String checkRevision(String _revision) {
+        if (_revision == null) {
+            return _revision;
+        }
+        String revision = REVISION_CACHE.putIfAbsent(_revision, _revision);
+        if (revision != null) {
+            return revision;
+        }
+        return _revision;
+    }
+
+    public static String intern(String _protocol) {
+        if (_protocol == null) {
+            return _protocol;
+        }
+        return _protocol.intern();
+    }
+
+    public static void putParamsIntern(Map<String, String> params, String key, String value) {
+        if (key == null || value == null) {
+            params.put(key, value);
+            return;
+        }
+        key = key.intern();
+        value = value.intern();
+        params.put(key, value);
+    }
 }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLParam.java b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLParam.java
index b2dca0c..cea92cb 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLParam.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLParam.java
@@ -21,6 +21,8 @@ import org.apache.dubbo.common.URLStrParser;
 import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.StringUtils;
 
+import org.eclipse.collections.impl.map.mutable.UnifiedMap;
+
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
@@ -263,7 +265,7 @@ public class URLParam implements Serializable {
 
     public static URLParam parse(String rawParam) {
         String[] parts = rawParam.split("&");
-        Map<String, String> parameters = new HashMap<>((int) (parts.length/.75f) + 1);
+        Map<String, String> parameters = new UnifiedMap<>((int) (parts.length/.75f) + 1);
         for (String part : parts) {
             part = part.trim();
             if (part.length() > 0) {
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ConfigUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ConfigUtils.java
index c1f4711..d0d0693 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ConfigUtils.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ConfigUtils.java
@@ -21,12 +21,16 @@ import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Map;
@@ -296,6 +300,49 @@ public class ConfigUtils {
         return properties;
     }
 
+    public static String loadMigrationRule(String fileName) {
+        String rawRule = "";
+        if (checkFileNameExist(fileName)) {
+            try {
+                try (FileInputStream input = new FileInputStream(fileName)) {
+                    rawRule = readString(input);
+                }
+            } catch (Throwable e) {
+                logger.warn("Failed to load " + fileName + " file from " + fileName + "(ignore this file): " + e.getMessage(), e);
+            }
+            return rawRule;
+        }
+
+        try {
+            InputStream is = ClassUtils.getClassLoader().getResourceAsStream(fileName);
+            if (is != null) {
+                rawRule = readString(is);
+            }
+        } catch (Throwable e) {
+            logger.warn("Failed to load " + fileName + " file from " + fileName + "(ignore this file): " + e.getMessage(), e);
+        }
+        return rawRule;
+    }
+
+    private static String readString(InputStream is) {
+        StringBuilder stringBuilder = new StringBuilder();
+        char[] buffer = new char[10];
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))){
+            int n;
+            while ((n = reader.read(buffer)) != -1) {
+                if (n < 10) {
+                    buffer = Arrays.copyOf(buffer, n);
+                }
+                stringBuilder.append(String.valueOf(buffer));
+                buffer = new char[10];
+            }
+        } catch (IOException e) {
+            logger.error("Read migration file error.", e);
+        }
+
+        return stringBuilder.toString();
+    }
+
     /**
      * check if the fileName can be found in filesystem
      *
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java
index 4e7bbf4..47556c5 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java
@@ -36,6 +36,7 @@ import javax.annotation.PostConstruct;
 import java.io.Serializable;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -336,7 +337,7 @@ public abstract class AbstractConfig implements Serializable {
             String value = entry.getValue();
             result.put(pre + key, value);
             // For compatibility, key like "registry-type" will has a duplicate key "registry.type"
-            if (key.contains("-")) {
+            if (Arrays.binarySearch(Constants.DOT_COMPATIBLE_KEYS, key) != -1) {
                 result.put(pre + key.replace('-', '.'), value);
             }
         }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/config/Constants.java
index f8fed84..894f138 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/Constants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/Constants.java
@@ -117,4 +117,6 @@ public interface Constants {
     String REGISTER_KEY = "register";
 
     String MULTI_SERIALIZATION_KEY = "serialize.multiple";
+
+    String[] DOT_COMPATIBLE_KEYS = new String[]{"qos-enable", "qos-port", "qos-accept-foreign-ip"};
 }
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
index a4116f4..cdf66dd 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
@@ -520,6 +520,7 @@ public class DubboBootstrap extends GenericEventListener {
         }
 
         ApplicationModel.initFrameworkExts();
+        
 
         startConfigCenter();
 
@@ -1162,7 +1163,7 @@ public class DubboBootstrap extends GenericEventListener {
 
     private void doRegisterServiceInstance(ServiceInstance serviceInstance) {
         // register instance only when at least one service is exported.
-        if (serviceInstance.getPort() != null && serviceInstance.getPort() != -1) {
+        if (serviceInstance.getPort() > 0) {
             publishMetadataToRemote(serviceInstance);
             logger.info("Start registering instance address to registry.");
             getServiceDiscoveries().forEach(serviceDiscovery ->
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ServiceInstanceHostPortCustomizer.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ServiceInstanceHostPortCustomizer.java
index 3831337..8693828 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ServiceInstanceHostPortCustomizer.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ServiceInstanceHostPortCustomizer.java
@@ -36,14 +36,14 @@ public class ServiceInstanceHostPortCustomizer implements ServiceInstanceCustomi
     @Override
     public void customize(ServiceInstance serviceInstance) {
 
-        if (serviceInstance.getPort() != null) {
+        if (serviceInstance.getPort() > 0) {
             return;
         }
 
         WritableMetadataService writableMetadataService = WritableMetadataService.getDefaultExtension();
 
         String host = null;
-        Integer port = null;
+        int port = -1;
         Set<URL> urls = writableMetadataService.getExportedServiceURLs();
         if (CollectionUtils.isNotEmpty(urls)) {
             String preferredProtocol = ApplicationModel.getApplicationConfig().getProtocol();
@@ -64,7 +64,6 @@ public class ServiceInstanceHostPortCustomizer implements ServiceInstanceCustomi
                 DefaultServiceInstance instance = (DefaultServiceInstance) serviceInstance;
                 instance.setHost(host);
                 instance.setPort(port);
-                instance.setId(host + ":" + port);
             }
         }
     }
diff --git a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/AbstractConfigTest.java b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/AbstractConfigTest.java
index 041cd75..04d5f72 100644
--- a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/AbstractConfigTest.java
+++ b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/AbstractConfigTest.java
@@ -116,8 +116,6 @@ public class AbstractConfigTest {
         Assertions.assertEquals("ONE,1", parameters.get("prefix.num"));
         Assertions.assertEquals("hello%2Fworld", parameters.get("prefix.naming"));
         Assertions.assertEquals("30", parameters.get("prefix.age"));
-        Assertions.assertTrue(parameters.containsKey("prefix.key-2"));
-        Assertions.assertTrue(parameters.containsKey("prefix.key.2"));
         Assertions.assertFalse(parameters.containsKey("prefix.secret"));
     }
 
@@ -807,7 +805,7 @@ public class AbstractConfigTest {
         public Map getParameters() {
             Map<String, String> map = new HashMap<String, String>();
             map.put("key.1", "one");
-            map.put("key-2", "two");
+            map.put("key.2", "two");
             return map;
         }
     }
diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/dubbo-migration.yaml b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/dubbo-migration.yaml
new file mode 100644
index 0000000..dc2e8f2
--- /dev/null
+++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/dubbo-migration.yaml
@@ -0,0 +1,3 @@
+key: demo-consumer
+step: FORCE_APPLICATION
+threshold: 0.1
\ No newline at end of file
diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index e6b67c1..8cc1d43 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -91,6 +91,7 @@
         <!-- Common libs -->
         <spring_version>4.3.16.RELEASE</spring_version>
         <javassist_version>3.20.0-GA</javassist_version>
+        <eclipse_collections_version>10.4.0</eclipse_collections_version>
         <netty_version>3.2.5.Final</netty_version>
         <netty4_version>4.1.56.Final</netty4_version>
         <mina_version>1.1.7</mina_version>
@@ -186,6 +187,11 @@
                 <version>${javassist_version}</version>
             </dependency>
             <dependency>
+                <groupId>org.eclipse.collections</groupId>
+                <artifactId>eclipse-collections</artifactId>
+                <version>${eclipse_collections_version}</version>
+            </dependency>
+            <dependency>
                 <groupId>org.jboss.netty</groupId>
                 <artifactId>netty</artifactId>
                 <version>${netty_version}</version>
diff --git a/dubbo-distribution/dubbo-all/pom.xml b/dubbo-distribution/dubbo-all/pom.xml
index 5729680..b464e41 100644
--- a/dubbo-distribution/dubbo-all/pom.xml
+++ b/dubbo-distribution/dubbo-all/pom.xml
@@ -312,6 +312,10 @@
             <artifactId>javassist</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.eclipse.collections</groupId>
+            <artifactId>eclipse-collections</artifactId>
+        </dependency>
+        <dependency>
             <groupId>io.netty</groupId>
             <artifactId>netty-all</artifactId>
         </dependency>
@@ -513,6 +517,10 @@
                                 </transformer>
                                 <transformer
                                         implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                                    <resource>META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter</resource>
+                                </transformer>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                     <resource>META-INF/dubbo/internal/org.apache.dubbo.rpc.InvokerListener</resource>
                                 </transformer>
                                 <transformer
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index 024328e..4106ad2 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -42,6 +42,8 @@ import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPAR
 import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
 
 public class MetadataInfo implements Serializable {
+    public static final MetadataInfo EMPTY = new MetadataInfo();
+
     private String app;
     private String revision;
     private Map<String, ServiceInfo> services;
@@ -50,6 +52,8 @@ public class MetadataInfo implements Serializable {
     private transient Map<String, String> extendParams;
     private transient AtomicBoolean reported = new AtomicBoolean(false);
 
+    public MetadataInfo() {}
+
     public MetadataInfo(String app) {
         this(app, null, null);
     }
diff --git a/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java b/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java
index 1492e0b..47e4bc7 100644
--- a/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java
+++ b/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java
@@ -36,7 +36,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
 import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
@@ -49,7 +48,7 @@ import static org.apache.dubbo.rpc.Constants.OUTPUT_KEY;
 /**
  * MonitorFilter. (SPI, Singleton, ThreadSafe)
  */
-@Activate(group = {PROVIDER, CONSUMER})
+@Activate(group = {PROVIDER})
 public class MonitorFilter implements Filter, Filter.Listener {
 
     private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class);
diff --git a/dubbo-plugin/dubbo-auth/src/main/java/org/apache/dubbo/auth/filter/ConsumerSignFilter.java b/dubbo-plugin/dubbo-auth/src/main/java/org/apache/dubbo/auth/filter/ConsumerSignFilter.java
index cf984a5..96438c5 100644
--- a/dubbo-plugin/dubbo-auth/src/main/java/org/apache/dubbo/auth/filter/ConsumerSignFilter.java
+++ b/dubbo-plugin/dubbo-auth/src/main/java/org/apache/dubbo/auth/filter/ConsumerSignFilter.java
@@ -33,7 +33,7 @@ import org.apache.dubbo.rpc.RpcException;
  *
  * @see org.apache.dubbo.rpc.Filter
  */
-@Activate(group = CommonConstants.CONSUMER, order = -10000)
+@Activate(group = CommonConstants.CONSUMER, value = Constants.SERVICE_AUTH, order = -10000)
 public class ConsumerSignFilter implements Filter {
 
     @Override
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java
index 89e3e75..4c02cdc 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java
@@ -45,4 +45,8 @@ public interface NotifyListener {
     default void addServiceListener(ServiceInstancesChangedListener instanceListener) {
     }
 
+    default URL getConsumerUrl() {
+        return null;
+    }
+
 }
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
index 93ba70e..1bcf284 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.registry.client;
 import org.apache.dubbo.metadata.MetadataInfo;
 
 import com.alibaba.fastjson.JSON;
+import org.eclipse.collections.impl.map.mutable.UnifiedMap;
 
 import java.util.HashMap;
 import java.util.List;
@@ -39,24 +40,23 @@ public class DefaultServiceInstance implements ServiceInstance {
 
     private static final long serialVersionUID = 1149677083747278100L;
 
-    private String id;
-
     private String serviceName;
 
     private String host;
 
-    private Integer port;
+    private int port;
 
     private boolean enabled;
 
     private boolean healthy;
 
-    private Map<String, String> metadata = new HashMap<>();
+    private Map<String, String> metadata = new UnifiedMap<>();
 
     private transient String address;
     private transient MetadataInfo serviceMetadata;
     // used at runtime
-    private transient Map<String, String> extendParams = new HashMap<>();
+    private transient String registryCluster; // extendParams can be more flexiable, but one single property uses less space
+    private transient Map<String, String> extendParams;
     private transient List<Endpoint> endpoints;
 
     public DefaultServiceInstance() {
@@ -70,17 +70,16 @@ public class DefaultServiceInstance implements ServiceInstance {
         this.healthy = other.healthy;
         this.metadata = other.metadata;
         this.serviceMetadata = other.serviceMetadata;
+        this.registryCluster = other.registryCluster;
         this.extendParams = other.extendParams;
         this.endpoints = other.endpoints;
         this.address = null;
-        this.id = null;
     }
 
-    public DefaultServiceInstance(String id, String serviceName, String host, Integer port) {
+    public DefaultServiceInstance(String serviceName, String host, Integer port) {
         if (port != null && port.intValue() < 1) {
             throw new IllegalArgumentException("The port must be greater than zero!");
         }
-        this.id = id;
         this.serviceName = serviceName;
         this.host = host;
         this.port = port;
@@ -88,18 +87,10 @@ public class DefaultServiceInstance implements ServiceInstance {
         this.healthy = true;
     }
 
-    public DefaultServiceInstance(String serviceName, String host, Integer port) {
-        this(host + ":" + port, serviceName, host, port);
-    }
-
     public DefaultServiceInstance(String serviceName) {
         this.serviceName = serviceName;
     }
 
-    public void setId(String id) {
-        this.id = id;
-    }
-
     public void setServiceName(String serviceName) {
         this.serviceName = serviceName;
     }
@@ -109,11 +100,6 @@ public class DefaultServiceInstance implements ServiceInstance {
     }
 
     @Override
-    public String getId() {
-        return id;
-    }
-
-    @Override
     public String getServiceName() {
         return serviceName;
     }
@@ -123,12 +109,12 @@ public class DefaultServiceInstance implements ServiceInstance {
         return host;
     }
 
-    public void setPort(Integer port) {
+    public void setPort(int port) {
         this.port = port;
     }
 
     @Override
-    public Integer getPort() {
+    public int getPort() {
         return port;
     }
 
@@ -173,7 +159,19 @@ public class DefaultServiceInstance implements ServiceInstance {
     }
 
     @Override
+    public String getRegistryCluster() {
+        return registryCluster;
+    }
+
+    public void setRegistryCluster(String registryCluster) {
+        this.registryCluster = registryCluster;
+    }
+
+    @Override
     public Map<String, String> getExtendParams() {
+        if (extendParams == null) {
+            extendParams = new HashMap<>();
+        }
         return extendParams;
     }
 
@@ -187,16 +185,19 @@ public class DefaultServiceInstance implements ServiceInstance {
     public DefaultServiceInstance copy(Endpoint endpoint) {
         DefaultServiceInstance copyOfInstance = new DefaultServiceInstance(this);
         copyOfInstance.setPort(endpoint.getPort());
-        copyOfInstance.setId(copyOfInstance.getAddress());
         return copyOfInstance;
     }
 
     @Override
     public Map<String, String> getAllParams() {
-        Map<String, String> allParams = new HashMap<>((int) ((metadata.size() + extendParams.size()) / 0.75f + 1));
-        allParams.putAll(metadata);
-        allParams.putAll(extendParams);
-        return allParams;
+        if (extendParams == null) {
+            return metadata;
+        } else {
+            Map<String, String> allParams = new HashMap<>((int) ((metadata.size() + extendParams.size()) / 0.75f + 1));
+            allParams.putAll(metadata);
+            allParams.putAll(extendParams);
+            return allParams;
+        }
     }
 
     public void setMetadata(Map<String, String> metadata) {
@@ -249,7 +250,6 @@ public class DefaultServiceInstance implements ServiceInstance {
     @Override
     public String toString() {
         return "DefaultServiceInstance{" +
-                "id='" + id + '\'' +
                 ", serviceName='" + serviceName + '\'' +
                 ", host='" + host + '\'' +
                 ", port=" + port +
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java
index ee99000..f9c8019 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java
@@ -224,6 +224,21 @@ final class EventPublishingServiceDiscovery implements ServiceDiscovery {
     }
 
     @Override
+    public ServiceInstancesChangedListener createListener(Set<String> serviceNames) {
+        return serviceDiscovery.createListener(serviceNames);
+    }
+
+    @Override
+    public void removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws IllegalArgumentException {
+        serviceDiscovery.removeServiceInstancesChangedListener(listener);
+    }
+
+    @Override
+    public long getDelay() {
+        return serviceDiscovery.getDelay();
+    }
+
+    @Override
     public URL getUrl() {
         return serviceDiscovery.getUrl();
     }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java
index 2a51168..1482a9d 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java
@@ -110,7 +110,7 @@ public class FileSystemServiceDiscovery implements ServiceDiscovery, EventListen
     }
 
     private String getServiceInstanceId(ServiceInstance serviceInstance) {
-        String id = serviceInstance.getId();
+        String id = serviceInstance.getAddress();
         if (StringUtils.isBlank(id)) {
             return serviceInstance.getHost() + "." + serviceInstance.getPort();
         }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
index 1034420..7537090 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
@@ -207,7 +207,7 @@ public abstract class SelfHostMetaServiceDiscovery implements ServiceDiscovery {
 
     @SuppressWarnings("unchecked")
     public final void fillServiceInstance(DefaultServiceInstance serviceInstance) {
-        String hostId = serviceInstance.getId();
+        String hostId = serviceInstance.getAddress();
         if (metadataMap.containsKey(hostId)) {
             // Use cached metadata.
             // Metadata will be updated by provider callback
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
index 90ba196..aa318d4 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
@@ -219,6 +219,7 @@ public interface ServiceDiscovery extends Prioritized {
 
     /**
      * unsubscribe to instances change event.
+     *
      * @param listener
      * @throws IllegalArgumentException
      */
@@ -226,6 +227,10 @@ public interface ServiceDiscovery extends Prioritized {
             throws IllegalArgumentException {
     }
 
+    default ServiceInstancesChangedListener createListener(Set<String> serviceNames) {
+        return new ServiceInstancesChangedListener(serviceNames, this);
+    }
+
     /**
      * Dispatch the {@link ServiceInstancesChangedEvent}
      *
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
index d19c8de..31ae6f9 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
@@ -291,7 +291,7 @@ public class ServiceDiscoveryRegistry implements Registry {
 
         // register ServiceInstancesChangedListener
         ServiceInstancesChangedListener serviceListener = serviceListeners.computeIfAbsent(serviceNamesKey, k -> {
-            ServiceInstancesChangedListener serviceInstancesChangedListener = new ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
+            ServiceInstancesChangedListener serviceInstancesChangedListener = serviceDiscovery.createListener(serviceNames);
             serviceInstancesChangedListener.setUrl(url);
             serviceNames.forEach(serviceName -> {
                 List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
index c41e028..6232097 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
@@ -32,9 +32,10 @@ import org.apache.dubbo.rpc.Protocol;
 import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.cluster.RouterChain;
 
+import org.eclipse.collections.impl.map.mutable.UnifiedMap;
+
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -152,7 +153,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
      * @return invokers
      */
     private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
-        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
+        Map<String, Invoker<T>> newUrlInvokerMap = new UnifiedMap<>();
         if (urls == null || urls.isEmpty()) {
             return newUrlInvokerMap;
         }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java
index 5bb2bfe..3a55c4e 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java
@@ -29,13 +29,6 @@ import java.util.SortedMap;
 public interface ServiceInstance extends Serializable {
 
     /**
-     * The id of the registered service instance.
-     *
-     * @return nullable
-     */
-    String getId();
-
-    /**
      * The name of service that current instance belongs to.
      *
      * @return non-null
@@ -54,7 +47,7 @@ public interface ServiceInstance extends Serializable {
      *
      * @return the positive integer if present
      */
-    Integer getPort();
+    int getPort();
 
     String getAddress();
 
@@ -87,6 +80,10 @@ public interface ServiceInstance extends Serializable {
 
     SortedMap<String, String> getSortedMetadata();
 
+    String getRegistryCluster();
+
+    void setRegistryCluster(String registryCluster);
+
     Map<String, String> getExtendParams();
 
     Map<String, String> getAllParams();
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 22280bd..dd0a0cf 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -53,7 +53,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE;
-import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_CLUSTER_KEY;
 import static org.apache.dubbo.metadata.RevisionResolver.EMPTY_REVISION;
 import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getExportedServicesRevision;
 
@@ -67,14 +66,14 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
 
     private static final Logger logger = LoggerFactory.getLogger(ServiceInstancesChangedListener.class);
 
-    private final Set<String> serviceNames;
-    private final ServiceDiscovery serviceDiscovery;
-    private URL url;
-    private Map<String, NotifyListener> listeners;
+    protected final Set<String> serviceNames;
+    protected final ServiceDiscovery serviceDiscovery;
+    protected URL url;
+    protected Map<String, NotifyListener> listeners;
 
     private Map<String, List<ServiceInstance>> allInstances;
 
-    private Map<String, List<URL>> serviceUrls;
+    private Map<String, Object> serviceUrls;
 
     private Map<String, MetadataInfo> revisionToMetadata;
 
@@ -112,8 +111,8 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
 
         Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>();
         Map<ServiceInfo, Set<String>> localServiceToRevisions = new HashMap<>();
-        Map<String, Map<Set<String>, List<URL>>> protocolRevisionsToUrls = new HashMap<>();
-        Map<String, List<URL>> newServiceUrls = new HashMap<>();//TODO
+        Map<String, Map<Set<String>, Object>> protocolRevisionsToUrls = new HashMap<>();
+        Map<String, Object> newServiceUrls = new HashMap<>();//TODO
         Map<String, MetadataInfo> newRevisionToMetadata = new HashMap<>();
 
         for (Map.Entry<String, List<ServiceInstance>> entry : allInstances.entrySet()) {
@@ -151,27 +150,14 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
 
         localServiceToRevisions.forEach((serviceInfo, revisions) -> {
             String protocol = serviceInfo.getProtocol();
-            Map<Set<String>, List<URL>> revisionsToUrls = protocolRevisionsToUrls.computeIfAbsent(protocol, k -> {
+            Map<Set<String>, Object> revisionsToUrls = protocolRevisionsToUrls.computeIfAbsent(protocol, k -> {
                 return new HashMap<>();
             });
-            List<URL> urls = revisionsToUrls.get(revisions);
+            Object urls = revisionsToUrls.get(revisions);
             if (urls != null) {
                 newServiceUrls.put(serviceInfo.getMatchKey(), urls);
             } else {
-                urls = new ArrayList<>();
-                for (String r : revisions) {
-                    for (ServiceInstance i : revisionToInstances.get(r)) {
-                        // different protocols may have ports specified in meta
-                        if (ServiceInstanceMetadataUtils.hasEndpoints(i)) {
-                            DefaultServiceInstance.Endpoint endpoint = ServiceInstanceMetadataUtils.getEndpoint(i, protocol);
-                            if (endpoint != null && !endpoint.getPort().equals(i.getPort())) {
-                                urls.add(((DefaultServiceInstance)i).copy(endpoint).toURL());
-                                break;
-                            }
-                        }
-                        urls.add(i.toURL());
-                    }
-                }
+                urls = getServiceUrlsCache(revisionToInstances, revisions, protocol);
                 revisionsToUrls.put(revisions, urls);
                 newServiceUrls.put(serviceInfo.getMatchKey(), urls);
             }
@@ -183,7 +169,7 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
 
     public synchronized void addListenerAndNotify(String serviceKey, NotifyListener listener) {
         this.listeners.put(serviceKey, listener);
-        List<URL> urls = this.serviceUrls.get(serviceKey);
+        List<URL> urls = getAddresses(serviceKey);
         if (CollectionUtils.isNotEmpty(urls)) {
             listener.notify(urls);
         }
@@ -197,7 +183,7 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
     }
 
     public List<URL> getUrls(String serviceKey) {
-        return toUrlsWithEmpty(serviceUrls.get(serviceKey));
+        return toUrlsWithEmpty(getAddresses(serviceKey));
     }
 
     /**
@@ -241,7 +227,7 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
         return serviceNames.contains(event.getServiceName());
     }
 
-    private boolean isRetryAndExpired(ServiceInstancesChangedEvent event) {
+    protected boolean isRetryAndExpired(ServiceInstancesChangedEvent event) {
         String appName = event.getServiceName();
         List<ServiceInstance> appInstances = event.getServiceInstances();
 
@@ -261,13 +247,13 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
         return false;
     }
 
-    private boolean hasEmptyMetadata(Map<String, MetadataInfo> revisionToMetadata) {
+    protected boolean hasEmptyMetadata(Map<String, MetadataInfo> revisionToMetadata) {
         if (revisionToMetadata == null) {
             return false;
         }
         boolean result = false;
         for (Map.Entry<String, MetadataInfo> entry : revisionToMetadata.entrySet()) {
-            if (entry.getValue() == null) {
+            if (entry.getValue() == MetadataInfo.EMPTY) {
                 result = true;
                 break;
             }
@@ -275,31 +261,31 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
         return result;
     }
 
-    private MetadataInfo getRemoteMetadata(ServiceInstance instance, String revision, Map<ServiceInfo, Set<String>> localServiceToRevisions, List<ServiceInstance> subInstances) {
+    protected MetadataInfo getRemoteMetadata(ServiceInstance instance, String revision, Map<ServiceInfo, Set<String>> localServiceToRevisions, List<ServiceInstance> subInstances) {
         MetadataInfo metadata = revisionToMetadata.get(revision);
-        if (metadata == null) {
-            if (failureCounter.get() < 3 || (System.currentTimeMillis() - lastFailureTime > 10000)) {
-                metadata = getMetadataInfo(instance);
-                if (metadata != null) {
-                    logger.info("MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision + " is " + metadata);
-                    failureCounter.set(0);
-                    revisionToMetadata.putIfAbsent(revision, metadata);
-                    parseMetadata(revision, metadata, localServiceToRevisions);
-                } else {
-                    logger.error("Failed to get MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision
-                            + ", wait for retry.");
-                    lastFailureTime = System.currentTimeMillis();
-                    failureCounter.incrementAndGet();
-                }
+        if (metadata == null
+                || (metadata == MetadataInfo.EMPTY && (failureCounter.get() < 3 || (System.currentTimeMillis() - lastFailureTime > 10000)))) {
+            metadata = getMetadataInfo(instance);
+
+            if (metadata != MetadataInfo.EMPTY) {
+                logger.info("MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision + " is " + metadata);
+                failureCounter.set(0);
+                revisionToMetadata.putIfAbsent(revision, metadata);
+                parseMetadata(revision, metadata, localServiceToRevisions);
+            } else {
+                logger.error("Failed to get MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision
+                        + ", wait for retry.");
+                lastFailureTime = System.currentTimeMillis();
+                failureCounter.incrementAndGet();
             }
-        } else if (subInstances.size() == 1) {
+        } else if (metadata != MetadataInfo.EMPTY && subInstances.size() == 1) {
             // "subInstances.size() >= 2" means metadata of this revision has been parsed, ignore
             parseMetadata(revision, metadata, localServiceToRevisions);
         }
         return metadata;
     }
 
-    private Map<ServiceInfo, Set<String>> parseMetadata(String revision, MetadataInfo metadata, Map<ServiceInfo, Set<String>> localServiceToRevisions) {
+    protected Map<ServiceInfo, Set<String>> parseMetadata(String revision, MetadataInfo metadata, Map<ServiceInfo, Set<String>> localServiceToRevisions) {
         Map<String, ServiceInfo> serviceInfos = metadata.getServices();
         for (Map.Entry<String, ServiceInfo> entry : serviceInfos.entrySet()) {
             Set<String> set = localServiceToRevisions.computeIfAbsent(entry.getValue(), k -> new TreeSet<>());
@@ -309,10 +295,12 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
         return localServiceToRevisions;
     }
 
-    private MetadataInfo getMetadataInfo(ServiceInstance instance) {
+    protected MetadataInfo getMetadataInfo(ServiceInstance instance) {
         String metadataType = ServiceInstanceMetadataUtils.getMetadataStorageType(instance);
         // FIXME, check "REGISTRY_CLUSTER_KEY" must be set by every registry implementation.
-        instance.getExtendParams().putIfAbsent(REGISTRY_CLUSTER_KEY, RegistryClusterIdentifier.getExtension(url).consumerKey(url));
+        if (instance.getRegistryCluster() == null) {
+            instance.setRegistryCluster(RegistryClusterIdentifier.getExtension(url).consumerKey(url));
+        }
         MetadataInfo metadataInfo;
         try {
             if (logger.isDebugEnabled()) {
@@ -332,19 +320,46 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
             logger.error("Failed to load service metadata, meta type is " + metadataType, e);
             metadataInfo = null;
         }
+
+        if (metadataInfo == null) {
+            metadataInfo = MetadataInfo.EMPTY;
+        }
         return metadataInfo;
     }
 
-    private void notifyAddressChanged() {
+    protected Object getServiceUrlsCache(Map<String, List<ServiceInstance>> revisionToInstances, Set<String> revisions, String protocol) {
+        List<URL> urls;
+        urls = new ArrayList<>();
+        for (String r : revisions) {
+            for (ServiceInstance i : revisionToInstances.get(r)) {
+                // different protocols may have ports specified in meta
+                if (ServiceInstanceMetadataUtils.hasEndpoints(i)) {
+                    DefaultServiceInstance.Endpoint endpoint = ServiceInstanceMetadataUtils.getEndpoint(i, protocol);
+                    if (endpoint != null && !endpoint.getPort().equals(i.getPort())) {
+                        urls.add(((DefaultServiceInstance) i).copy(endpoint).toURL());
+                        break;
+                    }
+                }
+                urls.add(i.toURL());
+            }
+        }
+        return urls;
+    }
+
+    protected List<URL> getAddresses(String serviceProtocolKey) {
+        return (List<URL>) serviceUrls.get(serviceProtocolKey);
+    }
+
+    protected void notifyAddressChanged() {
         listeners.forEach((key, notifyListener) -> {
             //FIXME, group wildcard match
-            List<URL> urls = toUrlsWithEmpty(serviceUrls.get(key));
+            List<URL> urls = toUrlsWithEmpty(getAddresses(key));
             logger.info("Notify service " + key + " with urls " + urls.size());
             notifyListener.notify(urls);
         });
     }
 
-    private List<URL> toUrlsWithEmpty(List<URL> urls) {
+    protected List<URL> toUrlsWithEmpty(List<URL> urls) {
         if (urls == null) {
             urls = Collections.emptyList();
         }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
index 27f66f5..42382f8 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
@@ -86,7 +86,7 @@ public class MetadataUtils {
     }
 
     public static String computeKey(ServiceInstance serviceInstance) {
-        return serviceInstance.getServiceName() + "##" + serviceInstance.getId() + "##" +
+        return serviceInstance.getServiceName() + "##" + serviceInstance.getAddress() + "##" +
                 ServiceInstanceMetadataUtils.getExportedServicesRevision(serviceInstance);
     }
 
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
index bca273b..d217e2d 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
@@ -85,7 +85,7 @@ public class RemoteMetadataServiceImpl {
         SubscriberMetadataIdentifier identifier = new SubscriberMetadataIdentifier(instance.getServiceName(),
                 ServiceInstanceMetadataUtils.getExportedServicesRevision(instance));
 
-        String registryCluster = instance.getExtendParams().get(REGISTRY_CLUSTER_KEY);
+        String registryCluster = instance.getRegistryCluster();
 
         checkRemoteConfigured();
 
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java
index 30f1ea1..a9c7b8c 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java
@@ -39,11 +39,11 @@ public class DefaultMigrationAddressComparator implements MigrationAddressCompar
     @Override
     public <T> boolean shouldMigrate(ClusterInvoker<T> serviceDiscoveryInvoker, ClusterInvoker<T> invoker, MigrationRule rule) {
         if (!serviceDiscoveryInvoker.hasProxyInvokers()) {
-            logger.info("No instance address available, will not migrate.");
+            logger.info("No instance address available, stop compare.");
             return false;
         }
         if (!invoker.hasProxyInvokers()) {
-            logger.info("No interface address available, will migrate.");
+            logger.info("No interface address available, stop compare.");
             return true;
         }
 
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
index 15e2434..2e94d3f 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
@@ -27,7 +27,6 @@ import org.apache.dubbo.registry.client.migration.model.MigrationStep;
 import org.apache.dubbo.registry.integration.DynamicDirectory;
 import org.apache.dubbo.registry.integration.RegistryProtocol;
 import org.apache.dubbo.rpc.Invocation;
-import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.cluster.Cluster;
@@ -36,7 +35,6 @@ import org.apache.dubbo.rpc.cluster.Directory;
 import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.model.ConsumerModel;
 
-import java.util.List;
 import java.util.Set;
 
 import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;
@@ -288,6 +286,9 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
 
     private volatile boolean invokersChanged;
 
+    /**
+     * Need to know which invoker change triggered this compare.
+     */
     private synchronized void compareAddresses(ClusterInvoker<T> serviceDiscoveryInvoker, ClusterInvoker<T> invoker) {
         this.invokersChanged = true;
         if (logger.isDebugEnabled()) {
@@ -297,10 +298,10 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
         Set<MigrationAddressComparator> detectors = ExtensionLoader.getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances();
         if (detectors != null && detectors.stream().allMatch(migrationDetector -> migrationDetector.shouldMigrate(serviceDiscoveryInvoker, invoker, rule))) {
             logger.info("serviceKey:" + invoker.getUrl().getServiceKey() + " switch to APP Level address");
-            discardInterfaceInvokerAddress(invoker);
+            destroyInterfaceInvoker(invoker);
         } else {
             logger.info("serviceKey:" + invoker.getUrl().getServiceKey() + " switch to Service Level address");
-            discardServiceDiscoveryInvokerAddress(serviceDiscoveryInvoker);
+            destroyServiceDiscoveryInvoker(serviceDiscoveryInvoker);
         }
     }
 
@@ -310,26 +311,28 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
             updateConsumerModel(currentAvailableInvoker, serviceDiscoveryInvoker);
         }
         if (serviceDiscoveryInvoker != null) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Destroying instance address invokers, will not listen for address changes until re-subscribed, " + type.getName());
+            if (serviceDiscoveryInvoker.getDirectory().isNotificationReceived()) {
+                if (logger.isInfoEnabled()) {
+                    logger.info("Destroying instance address invokers, will not listen for address changes until re-subscribed, " + type.getName());
+                }
+                serviceDiscoveryInvoker.destroy();
             }
-            serviceDiscoveryInvoker.destroy();
         }
     }
 
-    protected synchronized void discardServiceDiscoveryInvokerAddress(ClusterInvoker<T> serviceDiscoveryInvoker) {
-        if (this.invoker != null) {
-            this.currentAvailableInvoker = this.invoker;
-            updateConsumerModel(currentAvailableInvoker, serviceDiscoveryInvoker);
-        }
-        if (serviceDiscoveryInvoker != null) {
-            if (logger.isDebugEnabled()) {
-                List<Invoker<T>> invokers = serviceDiscoveryInvoker.getDirectory().getAllInvokers();
-                logger.debug("Discarding instance addresses, total size " + (invokers == null ? 0 : invokers.size()));
-            }
-//            serviceDiscoveryInvoker.getDirectory().discordAddresses();
-        }
-    }
+//    protected synchronized void discardServiceDiscoveryInvokerAddress(ClusterInvoker<T> serviceDiscoveryInvoker) {
+//        if (this.invoker != null) {
+//            this.currentAvailableInvoker = this.invoker;
+//            updateConsumerModel(currentAvailableInvoker, serviceDiscoveryInvoker);
+//        }
+//        if (serviceDiscoveryInvoker != null) {
+//            if (logger.isDebugEnabled()) {
+//                List<Invoker<T>> invokers = serviceDiscoveryInvoker.getDirectory().getAllInvokers();
+//                logger.debug("Discarding instance addresses, total size " + (invokers == null ? 0 : invokers.size()));
+//            }
+////            serviceDiscoveryInvoker.getDirectory().discordAddresses();
+//        }
+//    }
 
     protected void refreshServiceDiscoveryInvoker() {
         clearListener(serviceDiscoveryInvoker);
@@ -338,8 +341,6 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
                 logger.debug("Re-subscribing instance addresses, current interface " + type.getName());
             }
             serviceDiscoveryInvoker = registryProtocol.getServiceDiscoveryInvoker(cluster, registry, type, url);
-        } else {
-            ((DynamicDirectory) serviceDiscoveryInvoker.getDirectory()).markInvokersChanged();
         }
     }
 
@@ -352,8 +353,6 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
             }
 
             invoker = registryProtocol.getInvoker(cluster, registry, type, url);
-        } else {
-            ((DynamicDirectory) invoker.getDirectory()).markInvokersChanged();
         }
     }
 
@@ -363,26 +362,28 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
             updateConsumerModel(currentAvailableInvoker, invoker);
         }
         if (invoker != null) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Destroying interface address invokers, will not listen for address changes until re-subscribed, " + type.getName());
-            }
-            invoker.destroy();
-        }
-    }
-
-    protected synchronized void discardInterfaceInvokerAddress(ClusterInvoker<T> invoker) {
-        if (this.serviceDiscoveryInvoker != null) {
-            this.currentAvailableInvoker = this.serviceDiscoveryInvoker;
-            updateConsumerModel(currentAvailableInvoker, invoker);
-        }
-        if (invoker != null) {
-            if (logger.isDebugEnabled()) {
-                List<Invoker<T>> invokers = invoker.getDirectory().getAllInvokers();
-                logger.debug("Discarding interface addresses, total address size " + (invokers == null ? 0 : invokers.size()));
+            if (invoker.getDirectory().isNotificationReceived()) {
+                if (logger.isInfoEnabled()) {
+                    logger.info("Destroying interface address invokers, will not listen for address changes until re-subscribed, " + type.getName());
+                }
+                invoker.destroy();
             }
-            //invoker.getDirectory().discordAddresses();
         }
     }
+//
+//    protected synchronized void discardInterfaceInvokerAddress(ClusterInvoker<T> invoker) {
+//        if (this.serviceDiscoveryInvoker != null) {
+//            this.currentAvailableInvoker = this.serviceDiscoveryInvoker;
+//            updateConsumerModel(currentAvailableInvoker, invoker);
+//        }
+//        if (invoker != null) {
+//            if (logger.isDebugEnabled()) {
+//                List<Invoker<T>> invokers = invoker.getDirectory().getAllInvokers();
+//                logger.debug("Discarding interface addresses, total address size " + (invokers == null ? 0 : invokers.size()));
+//            }
+//            //invoker.getDirectory().discordAddresses();
+//        }
+//    }
 
     private void clearListener(ClusterInvoker<T> invoker) {
         if (invoker == null) return;
@@ -410,9 +411,9 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
             if (workingInvoker != null) {
                 consumerModel.getServiceMetadata().addAttribute("currentClusterInvoker", workingInvoker);
             }
-            if (backInvoker != null) {
-                consumerModel.getServiceMetadata().addAttribute("backupClusterInvoker", backInvoker);
-            }
+//            if (backInvoker != null) {
+//                consumerModel.getServiceMetadata().addAttribute("backupClusterInvoker", backInvoker);
+//            }
         }
     }
 }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java
index 5972e06..b9718f4 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java
@@ -30,8 +30,8 @@ import java.util.Set;
 
 @Activate
 public class MigrationRuleHandler<T> {
+    public static final String DUBBO_SERVICEDISCOVERY_MIGRATION = "dubbo.application.service-discovery.migration";
     private static final Logger logger = LoggerFactory.getLogger(MigrationRuleHandler.class);
-    private static final String DUBBO_SERVICEDISCOVERY_MIGRATION = "dubbo.application.service-discovery.migration";
 
     private MigrationClusterInvoker<T> migrationInvoker;
     private MigrationStep currentStep;
@@ -119,8 +119,10 @@ public class MigrationRuleHandler<T> {
         }
 
         if (step == MigrationStep.APPLICATION_FIRST) {
+            setCurrentStepAndThreshold(step, threshold);
             migrationInvoker.refreshServiceDiscoveryInvokerOnMappingCallback(false);
         } else if (step == MigrationStep.FORCE_APPLICATION) {
+            setCurrentStepAndThreshold(step, threshold);
             migrationInvoker.refreshServiceDiscoveryInvokerOnMappingCallback(true);
         }
     }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
index 399a260..85c7349 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
@@ -67,20 +67,24 @@ public class MigrationRuleListener implements RegistryProtocolListener, Configur
 
     public MigrationRuleListener() {
         this.configuration = ApplicationModel.getEnvironment().getDynamicConfiguration().orElse(null);
+
+        String localRawRule = ApplicationModel.getEnvironment().getLocalMigrationRule();
+        String defaultRawRule = StringUtils.isEmpty(localRawRule) ? INIT : localRawRule;
+
         if (this.configuration != null) {
             logger.info("Listening for migration rules on dataId " + RULE_KEY + ", group " + DUBBO_SERVICEDISCOVERY_MIGRATION);
             configuration.addListener(RULE_KEY, DUBBO_SERVICEDISCOVERY_MIGRATION, this);
 
             String rawRule = configuration.getConfig(RULE_KEY, DUBBO_SERVICEDISCOVERY_MIGRATION);
             if (StringUtils.isEmpty(rawRule)) {
-                rawRule = INIT;
+                rawRule = defaultRawRule;
             }
             this.rawRule = rawRule;
         } else {
             if (logger.isWarnEnabled()) {
                 logger.warn("Using default configuration rule because config center is not configured!");
             }
-            rawRule = INIT;
+            rawRule = defaultRawRule;
         }
 //        process(new ConfigChangedEvent(RULE_KEY, DUBBO_SERVICEDISCOVERY_MIGRATION, rawRule));
     }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
index 8b29483..c7b0daa 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
@@ -251,20 +251,18 @@ public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implement
         this.invokersChangedListener = listener;
         if (invokersChangedListener != null && invokersChanged) {
             invokersChangedListener.onChange();
-            invokersChanged = false;
         }
     }
 
     protected synchronized void invokersChanged() {
         invokersChanged = true;
-        if (invokersChangedListener != null && invokersChanged) {
+        if (invokersChangedListener != null) {
             invokersChangedListener.onChange();
-            invokersChanged = false;
         }
     }
 
-    public synchronized void markInvokersChanged() {
-        this.invokersChanged = true;
+    public boolean isNotificationReceived() {
+        return invokersChanged;
     }
 
     protected abstract void destroyAllInvokers();
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java
index 85b63de..e2909ce 100644
--- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java
@@ -19,7 +19,6 @@ package org.apache.dubbo.registry.client;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import static java.lang.String.valueOf;
 import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.METADATA_SERVICE_URLS_PROPERTY_NAME;
 import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -37,7 +36,7 @@ public class DefaultServiceInstanceTest {
     public DefaultServiceInstance instance;
 
     public static DefaultServiceInstance createInstance() {
-        DefaultServiceInstance instance = new DefaultServiceInstance(valueOf(System.nanoTime()), "A", "127.0.0.1", 8080);
+        DefaultServiceInstance instance = new DefaultServiceInstance("A", "127.0.0.1", 8080);
         instance.getMetadata().put("dubbo.metadata-service.urls", "[ \"dubbo://192.168.0.102:20881/com.alibaba.cloud.dubbo.service.DubboMetadataService?anyhost=true&application=spring-cloud-alibaba-dubbo-provider&bind.ip=192.168.0.102&bind.port=20881&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=spring-cloud-alibaba-dubbo-provider&interface=com.alibaba.cloud.dubbo.service.DubboMetadataService&methods=getAllServiceKeys,getServiceRestMetadata,getExportedURLs,getAllExportedU [...]
         instance.getMetadata().put("dubbo.metadata-service.url-params", "{\"dubbo\":{\"application\":\"dubbo-provider-demo\",\"deprecated\":\"false\",\"group\":\"dubbo-provider-demo\",\"version\":\"1.0.0\",\"timestamp\":\"1564845042651\",\"dubbo\":\"2.0.2\",\"provider.host\":\"192.168.0.102\",\"provider.port\":\"20880\"}}");
         return instance;
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java
index 8677381..65527df 100644
--- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URLBuilder;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import static org.apache.dubbo.registry.client.DefaultServiceInstanceTest.createInstance;
@@ -29,6 +30,7 @@ import static org.apache.dubbo.registry.client.DefaultServiceInstanceTest.create
  *
  * @since 2.7.5
  */
+@Disabled("FileSystemServiceDiscovery implementation is not stable enough at present")
 public class FileSystemServiceDiscoveryTest {
 
     private FileSystemServiceDiscovery serviceDiscovery;
diff --git a/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleServiceDiscovery.java b/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleServiceDiscovery.java
index 084376b..15bf2d1 100644
--- a/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleServiceDiscovery.java
@@ -20,7 +20,6 @@ import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.utils.DefaultPage;
 import org.apache.dubbo.common.utils.Page;
-import org.apache.dubbo.event.ConditionalEventListener;
 import org.apache.dubbo.registry.client.ServiceDiscovery;
 import org.apache.dubbo.registry.client.ServiceDiscoveryFactory;
 import org.apache.dubbo.registry.client.ServiceInstance;
@@ -33,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 
 public class MultipleServiceDiscovery implements ServiceDiscovery {
     public static final String REGISTRY_PREFIX_KEY = "child.";
@@ -88,16 +88,23 @@ public class MultipleServiceDiscovery implements ServiceDiscovery {
 
     @Override
     public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException {
-        MultiServiceInstancesChangedListener multiListener = new MultiServiceInstancesChangedListener(listener);
+        MultiServiceInstancesChangedListener multiListener = (MultiServiceInstancesChangedListener) listener;
 
         for (String registryKey : serviceDiscoveries.keySet()) {
-            SingleServiceInstancesChangedListener singleListener = new SingleServiceInstancesChangedListener(listener.getServiceNames(), serviceDiscoveries.get(registryKey), multiListener);
-            multiListener.putSingleListener(registryKey, singleListener);
-            serviceDiscoveries.get(registryKey).addServiceInstancesChangedListener(singleListener);
+            ServiceDiscovery serviceDiscovery = serviceDiscoveries.get(registryKey);
+            SingleServiceInstancesChangedListener singleListener = multiListener.getAndComputeIfAbsent(registryKey, k -> {
+                return new SingleServiceInstancesChangedListener(listener.getServiceNames(), serviceDiscovery, multiListener);
+            });
+            serviceDiscovery.addServiceInstancesChangedListener(singleListener);
         }
     }
 
     @Override
+    public ServiceInstancesChangedListener createListener(Set<String> serviceNames) {
+        return new MultiServiceInstancesChangedListener(serviceNames, this);
+    }
+
+    @Override
     public Page<ServiceInstance> getInstances(String serviceName, int offset, int pageSize, boolean healthyOnly) throws NullPointerException, IllegalArgumentException, UnsupportedOperationException {
 
         List<ServiceInstance> serviceInstanceList = new ArrayList<>();
@@ -123,17 +130,12 @@ public class MultipleServiceDiscovery implements ServiceDiscovery {
         return serviceInstance;
     }
 
-    protected static class MultiServiceInstancesChangedListener implements ConditionalEventListener<ServiceInstancesChangedEvent> {
-        private final ServiceInstancesChangedListener sourceListener;
-        private final Map<String, SingleServiceInstancesChangedListener> singleListenerMap = new ConcurrentHashMap<>();
-
-        public MultiServiceInstancesChangedListener(ServiceInstancesChangedListener sourceListener) {
-            this.sourceListener = sourceListener;
-        }
+    protected static class MultiServiceInstancesChangedListener extends ServiceInstancesChangedListener {
+        private final Map<String, SingleServiceInstancesChangedListener> singleListenerMap;
 
-        @Override
-        public boolean accept(ServiceInstancesChangedEvent event) {
-            return sourceListener.getServiceNames().contains(event.getServiceName());
+        public MultiServiceInstancesChangedListener(Set<String> serviceNames, ServiceDiscovery serviceDiscovery) {
+            super(serviceNames, serviceDiscovery);
+            this.singleListenerMap = new ConcurrentHashMap<>();
         }
 
         @Override
@@ -149,12 +151,16 @@ public class MultipleServiceDiscovery implements ServiceDiscovery {
                 }
             }
 
-            sourceListener.onEvent(new ServiceInstancesChangedEvent(event.getServiceName(), serviceInstances));
+            super.onEvent(new ServiceInstancesChangedEvent(event.getServiceName(), serviceInstances));
         }
 
         public void putSingleListener(String registryKey, SingleServiceInstancesChangedListener singleListener) {
             singleListenerMap.put(registryKey, singleListener);
         }
+
+        public SingleServiceInstancesChangedListener getAndComputeIfAbsent(String registryKey, Function<String, SingleServiceInstancesChangedListener> func) {
+            return singleListenerMap.computeIfAbsent(registryKey, func);
+        }
     }
 
     protected static class SingleServiceInstancesChangedListener extends ServiceInstancesChangedListener {
diff --git a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/util/NacosNamingServiceUtils.java b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/util/NacosNamingServiceUtils.java
index 7542b5e..abc7467 100644
--- a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/util/NacosNamingServiceUtils.java
+++ b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/util/NacosNamingServiceUtils.java
@@ -57,7 +57,6 @@ public class NacosNamingServiceUtils {
      */
     public static Instance toInstance(ServiceInstance serviceInstance) {
         Instance instance = new Instance();
-        instance.setInstanceId(serviceInstance.getId());
         instance.setServiceName(serviceInstance.getServiceName());
         instance.setIp(serviceInstance.getHost());
         instance.setPort(serviceInstance.getPort());
@@ -75,8 +74,7 @@ public class NacosNamingServiceUtils {
      * @since 2.7.5
      */
     public static ServiceInstance toServiceInstance(Instance instance) {
-        DefaultServiceInstance serviceInstance = new DefaultServiceInstance(instance.getInstanceId(),
-                NamingUtils.getServiceName(instance.getServiceName()), instance.getIp(), instance.getPort());
+        DefaultServiceInstance serviceInstance = new DefaultServiceInstance(NamingUtils.getServiceName(instance.getServiceName()), instance.getIp(), instance.getPort());
         serviceInstance.setMetadata(instance.getMetadata());
         serviceInstance.setEnabled(instance.isEnabled());
         serviceInstance.setHealthy(instance.isHealthy());
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
index dce7d20..549b803 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
@@ -26,6 +26,7 @@ import org.apache.dubbo.common.utils.Page;
 import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
 import org.apache.dubbo.registry.client.ServiceDiscovery;
 import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
 import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
 import org.apache.dubbo.rpc.RpcException;
 
@@ -40,6 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 
 import static org.apache.dubbo.common.function.ThrowableFunction.execute;
 import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.isInstanceUpdated;
@@ -199,18 +201,28 @@ public class ZookeeperServiceDiscovery extends AbstractServiceDiscovery {
             throw new IllegalStateException("registerServiceWatcher create path=" + path + " fail.", e);
         }
 
-        CuratorWatcher watcher = watcherCaches.computeIfAbsent(path, key ->
-                new ZookeeperServiceDiscoveryChangeWatcher(this, serviceName, listener));
-        try {
-            curatorFramework.getChildren().usingWatcher(watcher).forPath(path);
-        } catch (KeeperException.NoNodeException e) {
-            // ignored
-            if (logger.isErrorEnabled()) {
-                logger.error(e.getMessage());
+        CountDownLatch latch = new CountDownLatch(1);
+        ZookeeperServiceDiscoveryChangeWatcher watcher = watcherCaches.computeIfAbsent(path, key -> {
+            ZookeeperServiceDiscoveryChangeWatcher tmpWatcher = new ZookeeperServiceDiscoveryChangeWatcher(this, serviceName, path, latch);
+            try {
+                curatorFramework.getChildren().usingWatcher(tmpWatcher).forPath(path);
+            } catch (KeeperException.NoNodeException e) {
+                // ignored
+                if (logger.isErrorEnabled()) {
+                    logger.error(e.getMessage());
+                }
+            } catch (Exception e) {
+                throw new IllegalStateException(e.getMessage(), e);
             }
-        } catch (Exception e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
+            return tmpWatcher;
+        });
+        watcher.addListener(listener);
+        listener.onEvent(new ServiceInstancesChangedEvent(serviceName, this.getInstances(serviceName)));
+        latch.countDown();
+    }
+
+    public void reRegisterWatcher(ZookeeperServiceDiscoveryChangeWatcher watcher) throws Exception {
+        curatorFramework.getChildren().usingWatcher(watcher).forPath(watcher.getPath());
     }
 
     private String buildServicePath(String serviceName) {
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
index e34d3e4..8cffc3e 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
@@ -16,8 +16,10 @@
  */
 package org.apache.dubbo.registry.zookeeper;
 
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
 import org.apache.dubbo.registry.RegistryNotifier;
 import org.apache.dubbo.registry.client.ServiceDiscovery;
+import org.apache.dubbo.registry.client.ServiceInstance;
 import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
 import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
 
@@ -25,6 +27,10 @@ import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
 import static org.apache.dubbo.rpc.model.ApplicationModel.getExecutorRepository;
 import static org.apache.zookeeper.Watcher.Event.EventType.NodeChildrenChanged;
 import static org.apache.zookeeper.Watcher.Event.EventType.NodeDataChanged;
@@ -37,7 +43,7 @@ import static org.apache.zookeeper.Watcher.Event.EventType.NodeDataChanged;
  * @since 2.7.5
  */
 public class ZookeeperServiceDiscoveryChangeWatcher implements CuratorWatcher {
-    private ServiceInstancesChangedListener listener;
+    private Set<ServiceInstancesChangedListener> listeners = new ConcurrentHashSet<>();
 
     private final ZookeeperServiceDiscovery zookeeperServiceDiscovery;
 
@@ -47,34 +53,52 @@ public class ZookeeperServiceDiscoveryChangeWatcher implements CuratorWatcher {
 
     private final String serviceName;
 
+    private final String path;
+
+    private CountDownLatch latch;
+
     public ZookeeperServiceDiscoveryChangeWatcher(ZookeeperServiceDiscovery zookeeperServiceDiscovery,
                                                   String serviceName,
-                                                  ServiceInstancesChangedListener listener) {
+                                                  String path,
+                                                  CountDownLatch latch) {
         this.zookeeperServiceDiscovery = zookeeperServiceDiscovery;
         this.serviceName = serviceName;
-        this.listener = listener;
+        this.path = path;
+        this.latch = latch;
         this.notifier = new RegistryNotifier(zookeeperServiceDiscovery.getDelay(), getExecutorRepository().getServiceDiscoveryAddressNotificationExecutor()) {
             @Override
             protected void doNotify(Object rawAddresses) {
-                listener.onEvent((ServiceInstancesChangedEvent)rawAddresses);
+                listeners.forEach(listener -> listener.onEvent((ServiceInstancesChangedEvent)rawAddresses));
             }
         };
     }
 
     @Override
     public void process(WatchedEvent event) throws Exception {
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+        }
 
         Watcher.Event.EventType eventType = event.getType();
 
         if (NodeChildrenChanged.equals(eventType) || NodeDataChanged.equals(eventType)) {
             if (shouldKeepWatching()) {
-                notifier.notify(new ServiceInstancesChangedEvent(serviceName, zookeeperServiceDiscovery.getInstances(serviceName)));
-                zookeeperServiceDiscovery.registerServiceWatcher(serviceName, listener);
-                zookeeperServiceDiscovery.dispatchServiceInstancesChangedEvent(serviceName);
+                zookeeperServiceDiscovery.reRegisterWatcher(this);
+                List<ServiceInstance> instanceList = zookeeperServiceDiscovery.getInstances(serviceName);
+                notifier.notify(new ServiceInstancesChangedEvent(serviceName, instanceList));
             }
         }
     }
 
+    public String getPath() {
+        return path;
+    }
+
+    public void addListener(ServiceInstancesChangedListener listener) {
+        listeners.add(listener);
+    }
+
     public boolean shouldKeepWatching() {
         return keepWatching;
     }
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
index 278aae7..ed9be54 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
@@ -85,7 +85,7 @@ public abstract class CuratorFrameworkUtils {
         String host = instance.getAddress();
         int port = instance.getPort();
         ZookeeperInstance zookeeperInstance = instance.getPayload();
-        DefaultServiceInstance serviceInstance = new DefaultServiceInstance(instance.getId(), name, host, port);
+        DefaultServiceInstance serviceInstance = new DefaultServiceInstance(name, host, port);
         serviceInstance.setMetadata(zookeeperInstance.getMetadata());
         return serviceInstance;
     }
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java b/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java
index 5f3b680..359d62a 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java
@@ -34,7 +34,6 @@ import java.util.Map;
 import static java.util.Arrays.asList;
 import static org.apache.dubbo.common.utils.NetUtils.getAvailablePort;
 import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.INSTANCE_REVISION_UPDATED_KEY;
-import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.generateId;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -104,7 +103,7 @@ public class ZookeeperServiceDiscoveryTest {
     }
 
     private DefaultServiceInstance createServiceInstance(String serviceName, String host, int port) {
-        return new DefaultServiceInstance(generateId(host, port), serviceName, host, port);
+        return new DefaultServiceInstance(serviceName, host, port);
     }
 
 //    @Test
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java
new file mode 100644
index 0000000..d0b7848
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+public interface BaseFilter {
+    /**
+     * Make sure call invoker.invoke() in your implementation.
+     */
+    Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
+
+    interface Listener {
+
+        void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
+
+        void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java
index 58e8215..74acb51 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java
@@ -33,6 +33,32 @@ import org.apache.dubbo.common.extension.SPI;
  *    Caching is implemented in dubbo using filter approach. If cache is configured for invocation then before
  *    remote call configured caching type's (e.g. Thread Local, JCache etc) implementation invoke method gets called.
  * </pre>
+ *
+ * Start from 3.0, the semantics of the Filter component at the consumer side has changed.
+ * Instead of intercepting a specific instance of invoker, Filter in 3.0 now intercepts ClusterInvoker. A new SPI named
+ * InstanceFilter is introduced to work as the same semantic as Filter in 2.x.
+ *
+ * The difference of Filter is as follows:
+ *
+ * 3.x Filter
+ *
+ *                                             -> InstanceFilter -> Invoker
+ *
+ * Proxy -> Filter -> Filter -> ClusterInvoker -> InstanceFilter -> Invoker
+ *
+ *                                             -> InstanceFilter -> Invoker
+ *
+ *
+ * 2.x Filter
+ *
+ *                            Filter -> Filter -> Invoker
+ *
+ * Proxy -> ClusterInvoker -> Filter -> Filter -> Invoker
+ *
+ *                            Filter -> Filter -> Invoker
+ *
+ * If you want to a Filter
+ *
  * Filter. (SPI, Singleton, ThreadSafe)
  *
  * @see org.apache.dubbo.rpc.filter.GenericFilter
@@ -41,17 +67,5 @@ import org.apache.dubbo.common.extension.SPI;
  * @see org.apache.dubbo.rpc.filter.TpsLimitFilter
  */
 @SPI
-public interface Filter {
-    /**
-     * Make sure call invoker.invoke() in your implementation.
-     */
-    Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
-
-    interface Listener {
-
-        void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
-
-        void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
-    }
-
+public interface Filter extends BaseFilter {
 }
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index d7e8c44..33a9b64 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -52,7 +52,7 @@ import java.util.concurrent.TimeUnit;
  */
 public abstract class AbstractInvoker<T> implements Invoker<T> {
 
-    protected final Logger logger = LoggerFactory.getLogger(getClass());
+    protected static final Logger logger = LoggerFactory.getLogger(AbstractInvoker.class);
 
     private final Class<T> type;
 
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
index 5255f55..7c526c2 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
@@ -3,10 +3,8 @@ generic=org.apache.dubbo.rpc.filter.GenericFilter
 genericimpl=org.apache.dubbo.rpc.filter.GenericImplFilter
 token=org.apache.dubbo.rpc.filter.TokenFilter
 accesslog=org.apache.dubbo.rpc.filter.AccessLogFilter
-activelimit=org.apache.dubbo.rpc.filter.ActiveLimitFilter
 classloader=org.apache.dubbo.rpc.filter.ClassLoaderFilter
 context=org.apache.dubbo.rpc.filter.ContextFilter
-consumercontext=org.apache.dubbo.rpc.filter.ConsumerContextFilter
 exception=org.apache.dubbo.rpc.filter.ExceptionFilter
 executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter
 deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/FutureFilter.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/FutureFilter.java
index ee922af..ad95481 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/FutureFilter.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/FutureFilter.java
@@ -20,11 +20,11 @@ import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.extension.Activate;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.rpc.Filter;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.filter.ClusterFilter;
 import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.model.AsyncMethodInfo;
 import org.apache.dubbo.rpc.model.ConsumerModel;
@@ -39,7 +39,7 @@ import static org.apache.dubbo.rpc.protocol.dubbo.Constants.ASYNC_METHOD_INFO;
  * EventFilter
  */
 @Activate(group = CommonConstants.CONSUMER)
-public class FutureFilter implements Filter, Filter.Listener {
+public class FutureFilter implements ClusterFilter, ClusterFilter.Listener {
 
     protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);
 
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
index 79a7a38..ee41594 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
@@ -1,2 +1 @@
-trace=org.apache.dubbo.rpc.protocol.dubbo.filter.TraceFilter
-future=org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter
\ No newline at end of file
+trace=org.apache.dubbo.rpc.protocol.dubbo.filter.TraceFilter
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter
new file mode 100644
index 0000000..4783214
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter
@@ -0,0 +1 @@
+future=org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/FutureFilterTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/FutureFilterTest.java
index 1a1b0fd..d34f49a 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/FutureFilterTest.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/FutureFilterTest.java
@@ -18,11 +18,11 @@ package org.apache.dubbo.rpc.protocol.dubbo;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.rpc.AppResponse;
-import org.apache.dubbo.rpc.Filter;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.cluster.filter.ClusterFilter;
 import org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter;
 import org.apache.dubbo.rpc.protocol.dubbo.support.DemoService;
 
@@ -40,7 +40,7 @@ import static org.mockito.Mockito.mock;
  */
 public class FutureFilterTest {
     private static RpcInvocation invocation;
-    private Filter eventFilter = new FutureFilter();
+    private ClusterFilter eventFilter = new FutureFilter();
 
     @BeforeAll
     public static void setUp() {