You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/03/24 07:25:05 UTC
[dubbo] 04/12: [3.0] fix service discovery implementation and
introduce ClusterFiIlter (#7388)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 70a92457159e1b27bb7ee367118b7e32e20eb05c
Author: ken.lj <ke...@gmail.com>
AuthorDate: Wed Mar 17 00:03:09 2021 +0800
[3.0] fix service discovery implementation and introduce ClusterFiIlter (#7388)
---
.../org/apache/dubbo/rpc/cluster/Directory.java | 4 +
.../dubbo/rpc/cluster/filter/ClusterFilter.java | 24 +++
.../cluster/filter/DefaultFilterChainBuilder.java | 10 +-
.../rpc/cluster/filter/FilterChainBuilder.java | 36 +++-
.../rpc/cluster/filter/ProtocolFilterWrapper.java | 9 +-
.../filter/support}/ConsumerContextFilter.java | 26 ++-
.../support/ZoneAwareFilter.java} | 19 ++-
.../cluster/interceptor/ClusterInterceptor.java | 1 +
.../ConsumerContextClusterInterceptor.java | 60 -------
.../cluster/support/wrapper/AbstractCluster.java | 190 +++++++++++----------
...g.apache.dubbo.rpc.cluster.filter.ClusterFilter | 2 +
...ubbo.rpc.cluster.interceptor.ClusterInterceptor | 2 -
dubbo-common/pom.xml | 4 +
.../java/org/apache/dubbo/common/URLStrParser.java | 9 +-
.../apache/dubbo/common/config/Environment.java | 20 +++
.../dubbo/common/constants/CommonConstants.java | 6 +
.../dubbo/common/url/component/URLAddress.java | 2 +-
.../dubbo/common/url/component/URLItemCache.java | 44 +++--
.../dubbo/common/url/component/URLParam.java | 4 +-
.../org/apache/dubbo/common/utils/ConfigUtils.java | 47 +++++
.../org/apache/dubbo/config/AbstractConfig.java | 3 +-
.../java/org/apache/dubbo/config/Constants.java | 2 +
.../dubbo/config/bootstrap/DubboBootstrap.java | 3 +-
.../ServiceInstanceHostPortCustomizer.java | 5 +-
.../apache/dubbo/config/AbstractConfigTest.java | 4 +-
.../src/main/resources/dubbo-migration.yaml | 3 +
dubbo-dependencies-bom/pom.xml | 6 +
dubbo-distribution/dubbo-all/pom.xml | 8 +
.../org/apache/dubbo/metadata/MetadataInfo.java | 4 +
.../dubbo/monitor/support/MonitorFilter.java | 3 +-
.../dubbo/auth/filter/ConsumerSignFilter.java | 2 +-
.../org/apache/dubbo/registry/NotifyListener.java | 4 +
.../registry/client/DefaultServiceInstance.java | 58 +++----
.../client/EventPublishingServiceDiscovery.java | 15 ++
.../client/FileSystemServiceDiscovery.java | 2 +-
.../client/SelfHostMetaServiceDiscovery.java | 2 +-
.../dubbo/registry/client/ServiceDiscovery.java | 5 +
.../registry/client/ServiceDiscoveryRegistry.java | 2 +-
.../client/ServiceDiscoveryRegistryDirectory.java | 5 +-
.../dubbo/registry/client/ServiceInstance.java | 13 +-
.../listener/ServiceInstancesChangedListener.java | 117 +++++++------
.../registry/client/metadata/MetadataUtils.java | 2 +-
.../metadata/store/RemoteMetadataServiceImpl.java | 2 +-
.../DefaultMigrationAddressComparator.java | 4 +-
.../client/migration/MigrationInvoker.java | 89 +++++-----
.../client/migration/MigrationRuleHandler.java | 4 +-
.../client/migration/MigrationRuleListener.java | 8 +-
.../registry/integration/DynamicDirectory.java | 8 +-
.../client/DefaultServiceInstanceTest.java | 3 +-
.../client/FileSystemServiceDiscoveryTest.java | 2 +
.../multiple/MultipleServiceDiscovery.java | 38 +++--
.../nacos/util/NacosNamingServiceUtils.java | 4 +-
.../zookeeper/ZookeeperServiceDiscovery.java | 34 ++--
.../ZookeeperServiceDiscoveryChangeWatcher.java | 38 ++++-
.../zookeeper/util/CuratorFrameworkUtils.java | 2 +-
.../zookeeper/ZookeeperServiceDiscoveryTest.java | 3 +-
.../main/java/org/apache/dubbo/rpc/BaseFilter.java | 31 ++++
.../src/main/java/org/apache/dubbo/rpc/Filter.java | 40 +++--
.../apache/dubbo/rpc/protocol/AbstractInvoker.java | 2 +-
.../dubbo/internal/org.apache.dubbo.rpc.Filter | 2 -
.../rpc/protocol/dubbo/filter/FutureFilter.java | 4 +-
.../dubbo/internal/org.apache.dubbo.rpc.Filter | 3 +-
...g.apache.dubbo.rpc.cluster.filter.ClusterFilter | 1 +
.../dubbo/rpc/protocol/dubbo/FutureFilterTest.java | 4 +-
64 files changed, 688 insertions(+), 425 deletions(-)
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java
index 9fd3ca2..5a92d97 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java
@@ -65,4 +65,8 @@ public interface Directory<T> extends Node {
void discordAddresses();
RouterChain<T> getRouterChain();
+
+ default boolean isNotificationReceived() {
+ return false;
+ }
}
\ No newline at end of file
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ClusterFilter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ClusterFilter.java
new file mode 100644
index 0000000..7d48dc9
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ClusterFilter.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.filter;
+
+import org.apache.dubbo.common.extension.SPI;
+import org.apache.dubbo.rpc.BaseFilter;
+
+@SPI
+public interface ClusterFilter extends BaseFilter {
+}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java
index 982008b..e2e26d2 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java
@@ -27,6 +27,9 @@ import java.util.List;
@Activate(order = 0)
public class DefaultFilterChainBuilder implements FilterChainBuilder {
+ /**
+ * build consumer/provider filter chain
+ */
@Override
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {
Invoker<T> last = originalInvoker;
@@ -43,14 +46,17 @@ public class DefaultFilterChainBuilder implements FilterChainBuilder {
return last;
}
+ /**
+ * build consumer cluster filter chain
+ */
@Override
public <T> ClusterInvoker<T> buildClusterInvokerChain(final ClusterInvoker<T> originalInvoker, String key, String group) {
ClusterInvoker<T> last = originalInvoker;
- List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(originalInvoker.getUrl(), key, group);
+ List<ClusterFilter> filters = ExtensionLoader.getExtensionLoader(ClusterFilter.class).getActivateExtension(originalInvoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
- final Filter filter = filters.get(i);
+ final ClusterFilter filter = filters.get(i);
final Invoker<T> next = last;
last = new ClusterFilterChainNode<>(originalInvoker, next, filter);
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java
index 20275e2..949a66c 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.rpc.cluster.filter;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.SPI;
+import org.apache.dubbo.rpc.BaseFilter;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
@@ -29,16 +30,27 @@ import org.apache.dubbo.rpc.cluster.Directory;
@SPI("default")
public interface FilterChainBuilder {
+ /**
+ * build consumer/provider filter chain
+ */
<T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group);
+ /**
+ * build consumer cluster filter chain
+ */
<T> ClusterInvoker<T> buildClusterInvokerChain(final ClusterInvoker<T> invoker, String key, String group);
- class FilterChainNode<T, TYPE extends Invoker<T>> implements Invoker<T>{
+ /**
+ * Works on provider side
+ * @param <T>
+ * @param <TYPE>
+ */
+ class FilterChainNode<T, TYPE extends Invoker<T>, FILTER extends BaseFilter> implements Invoker<T>{
TYPE originalInvoker;
Invoker<T> nextNode;
- Filter filter;
+ FILTER filter;
- public FilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, Filter filter) {
+ public FilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
this.originalInvoker = originalInvoker;
this.nextNode = nextNode;
this.filter = filter;
@@ -79,8 +91,8 @@ public interface FilterChainBuilder {
} finally {
listenableFilter.removeListener(invocation);
}
- } else if (filter instanceof Filter.Listener) {
- Filter.Listener listener = (Filter.Listener) filter;
+ } else if (filter instanceof FILTER.Listener) {
+ FILTER.Listener listener = (FILTER.Listener) filter;
listener.onError(e, originalInvoker, invocation);
}
throw e;
@@ -102,8 +114,8 @@ public interface FilterChainBuilder {
} finally {
listenableFilter.removeListener(invocation);
}
- } else if (filter instanceof Filter.Listener) {
- Filter.Listener listener = (Filter.Listener) filter;
+ } else if (filter instanceof FILTER.Listener) {
+ FILTER.Listener listener = (FILTER.Listener) filter;
if (t == null) {
listener.onResponse(r, originalInvoker, invocation);
} else {
@@ -124,8 +136,14 @@ public interface FilterChainBuilder {
}
}
- class ClusterFilterChainNode<T, TYPE extends ClusterInvoker<T>> extends FilterChainNode<T, TYPE> implements ClusterInvoker<T> {
- public ClusterFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, Filter filter) {
+ /**
+ * Works on consumer side
+ * @param <T>
+ * @param <TYPE>
+ */
+ class ClusterFilterChainNode<T, TYPE extends ClusterInvoker<T>, FILTER extends BaseFilter>
+ extends FilterChainNode<T, TYPE, FILTER> implements ClusterInvoker<T> {
+ public ClusterFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
super(originalInvoker, nextNode, filter);
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ProtocolFilterWrapper.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ProtocolFilterWrapper.java
index 65e8813..389b173 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ProtocolFilterWrapper.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ProtocolFilterWrapper.java
@@ -28,10 +28,9 @@ import org.apache.dubbo.rpc.ProtocolServer;
import org.apache.dubbo.rpc.RpcException;
import java.util.List;
-import java.util.Objects;
+import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.SERVICE_FILTER_KEY;
-import static org.apache.dubbo.rpc.cluster.Constants.PEER_KEY;
/**
* ListenerProtocol
@@ -68,11 +67,7 @@ public class ProtocolFilterWrapper implements Protocol {
if (UrlUtils.isRegistry(url)) {
return protocol.refer(type, url);
}
- // if it's peer-to-peer url
- if (!Objects.isNull(url.getAttribute(PEER_KEY))) {
- return builder.buildInvokerChain(protocol.refer(type, url), SERVICE_FILTER_KEY, CommonConstants.CONSUMER);
- }
- return protocol.refer(type, url);
+ return builder.buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
}
@Override
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
similarity index 83%
rename from dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java
rename to dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
index 0fead6b..4a1ed3b 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.rpc.filter;
+package org.apache.dubbo.rpc.cluster.filter.support;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.utils.CollectionUtils;
@@ -28,6 +28,7 @@ import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.TimeoutCountDown;
+import org.apache.dubbo.rpc.cluster.filter.ClusterFilter;
import java.util.Map;
@@ -39,11 +40,11 @@ import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_K
* ConsumerContextFilter set current RpcContext with invoker,invocation, local host, remote host and port
* for consumer invoker.It does it to make the requires info available to execution thread's RpcContext.
*
- * @see org.apache.dubbo.rpc.Filter
+ * @see Filter
* @see RpcContext
*/
@Activate(group = CONSUMER, order = -10000)
-public class ConsumerContextFilter implements Filter {
+public class ConsumerContextFilter implements ClusterFilter, ClusterFilter.Listener {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
@@ -76,7 +77,24 @@ public class ConsumerContextFilter implements Filter {
+ invocation.getMethodName() + ", terminate directly."), invocation);
}
}
- return invoker.invoke(invocation);
+
+ try {
+ RpcContext.removeServerContext();
+ return invoker.invoke(invocation);
+ } finally {
+ RpcContext.removeContext();
+ }
+ }
+
+ @Override
+ public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
+ // pass attachments to result
+ RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());
+ }
+
+ @Override
+ public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
+
}
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ZoneAwareClusterInterceptor.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ZoneAwareFilter.java
similarity index 80%
rename from dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ZoneAwareClusterInterceptor.java
rename to dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ZoneAwareFilter.java
index 6daec08..cd0a7ab 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ZoneAwareClusterInterceptor.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ZoneAwareFilter.java
@@ -14,15 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.rpc.cluster.interceptor;
+package org.apache.dubbo.rpc.cluster.filter.support;
+import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.ZoneDetector;
-import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;
+import org.apache.dubbo.rpc.cluster.filter.ClusterFilter;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE_FORCE;
@@ -32,11 +36,11 @@ import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE_
*
* active only when url has key 'cluster=zone-aware'
*/
-@Activate(value = "cluster:zone-aware")
-public class ZoneAwareClusterInterceptor implements ClusterInterceptor {
+@Activate(group = CommonConstants.CONSUMER, value = "cluster:zone-aware")
+public class ZoneAwareFilter implements ClusterFilter {
@Override
- public void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
+ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext rpcContext = RpcContext.getContext();
String zone = (String) rpcContext.getAttachment(REGISTRY_ZONE);
String force = (String) rpcContext.getAttachment(REGISTRY_ZONE_FORCE);
@@ -53,10 +57,7 @@ public class ZoneAwareClusterInterceptor implements ClusterInterceptor {
if (StringUtils.isNotEmpty(force)) {
invocation.setAttachment(REGISTRY_ZONE_FORCE, force);
}
- }
-
- @Override
- public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
+ return invoker.invoke(invocation);
}
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java
index 199361f..821dd2e 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java
@@ -26,6 +26,7 @@ import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;
/**
* Different from {@link Filter}, ClusterInterceptor works at the outmost layer, before one specific address/invoker is picked.
*/
+@Deprecated
@SPI
public interface ClusterInterceptor {
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java
deleted file mode 100644
index 053bc87..0000000
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.cluster.interceptor;
-
-import org.apache.dubbo.common.extension.Activate;
-import org.apache.dubbo.common.utils.NetUtils;
-import org.apache.dubbo.rpc.Invocation;
-import org.apache.dubbo.rpc.Result;
-import org.apache.dubbo.rpc.RpcContext;
-import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;
-
-@Activate
-public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener {
-
- @Override
- public void before(AbstractClusterInvoker<?> invoker, Invocation invocation) {
- RpcContext context = RpcContext.getContext();
- context.setInvocation(invocation).setLocalAddress(NetUtils.getLocalHost(), 0);
- if (invocation instanceof RpcInvocation) {
- ((RpcInvocation) invocation).setInvoker(invoker);
- }
- RpcContext.removeServerContext();
- }
-
- @Override
- public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
- RpcContext.removeContext(true);
- }
-
- @Override
- public Result intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) throws RpcException {
- return clusterInvoker.invoke(invocation);
- }
-
- @Override
- public void onMessage(Result appResponse, AbstractClusterInvoker<?> invoker, Invocation invocation) {
- RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());
- }
-
- @Override
- public void onError(Throwable t, AbstractClusterInvoker<?> invoker, Invocation invocation) {
-
- }
-}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java
index 9ce920d..1c2d047 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.rpc.cluster.support.wrapper;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.utils.CollectionUtils;
@@ -36,6 +37,7 @@ import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;
import java.util.List;
+import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_INTERCEPTOR_COMPATIBLE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INVOCATION_INTERCEPTOR_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_INTERCEPTOR_KEY;
@@ -44,15 +46,10 @@ public abstract class AbstractCluster implements Cluster {
private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) {
// AbstractClusterInvoker<T> last = clusterInvoker;
- AbstractClusterInvoker<T> last = buildInterceptorInvoker(new FilterInvoker<>(clusterInvoker));
- List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtensions();
+ AbstractClusterInvoker<T> last = buildInterceptorInvoker(new ClusterFilterInvoker<>(clusterInvoker));
- if (!interceptors.isEmpty()) {
- for (int i = interceptors.size() - 1; i >= 0; i--) {
- final ClusterInterceptor interceptor = interceptors.get(i);
- final AbstractClusterInvoker<T> next = last;
- last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next);
- }
+ if (Boolean.parseBoolean(ConfigurationUtils.getProperty(CLUSTER_INTERCEPTOR_COMPATIBLE_KEY, "false"))) {
+ return build27xCompatibleClusterInterceptors(clusterInvoker, last);
}
return last;
}
@@ -70,90 +67,15 @@ public abstract class AbstractCluster implements Cluster {
if (CollectionUtils.isEmpty(builders)) {
return invoker;
}
- return new InterceptorInvoker<>(invoker, builders);
+ return new InvocationInterceptorInvoker<>(invoker, builders);
}
protected abstract <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException;
- static class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> {
-
- private AbstractClusterInvoker<T> clusterInvoker;
- private ClusterInterceptor interceptor;
- private AbstractClusterInvoker<T> next;
-
- public InterceptorInvokerNode(AbstractClusterInvoker<T> clusterInvoker,
- ClusterInterceptor interceptor,
- AbstractClusterInvoker<T> next) {
- this.clusterInvoker = clusterInvoker;
- this.interceptor = interceptor;
- this.next = next;
- }
-
- @Override
- public Class<T> getInterface() {
- return clusterInvoker.getInterface();
- }
-
- @Override
- public URL getUrl() {
- return clusterInvoker.getUrl();
- }
-
- @Override
- public boolean isAvailable() {
- return clusterInvoker.isAvailable();
- }
-
- @Override
- public Result invoke(Invocation invocation) throws RpcException {
- Result asyncResult;
- try {
- interceptor.before(next, invocation);
- asyncResult = interceptor.intercept(next, invocation);
- } catch (Exception e) {
- // onError callback
- if (interceptor instanceof ClusterInterceptor.Listener) {
- ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
- listener.onError(e, clusterInvoker, invocation);
- }
- throw e;
- } finally {
- interceptor.after(next, invocation);
- }
- return asyncResult.whenCompleteWithContext((r, t) -> {
- // onResponse callback
- if (interceptor instanceof ClusterInterceptor.Listener) {
- ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
- if (t == null) {
- listener.onMessage(r, clusterInvoker, invocation);
- } else {
- listener.onError(t, clusterInvoker, invocation);
- }
- }
- });
- }
-
- @Override
- public void destroy() {
- clusterInvoker.destroy();
- }
-
- @Override
- public String toString() {
- return clusterInvoker.toString();
- }
-
- @Override
- protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
- // The only purpose is to build a interceptor chain, so the cluster related logic doesn't matter.
- return null;
- }
- }
-
- static class FilterInvoker<T> extends AbstractClusterInvoker<T> {
+ static class ClusterFilterInvoker<T> extends AbstractClusterInvoker<T> {
private ClusterInvoker<T> filterInvoker;
- public FilterInvoker(AbstractClusterInvoker<T> invoker) {
+ public ClusterFilterInvoker(AbstractClusterInvoker<T> invoker) {
List<FilterChainBuilder> builders = ExtensionLoader.getExtensionLoader(FilterChainBuilder.class).getActivateExtensions();
if (CollectionUtils.isEmpty(builders)) {
filterInvoker = invoker;
@@ -203,10 +125,10 @@ public abstract class AbstractCluster implements Cluster {
}
}
- static class InterceptorInvoker<T> extends AbstractClusterInvoker<T> {
+ static class InvocationInterceptorInvoker<T> extends AbstractClusterInvoker<T> {
private ClusterInvoker<T> interceptorInvoker;
- public InterceptorInvoker(AbstractClusterInvoker<T> invoker, List<InvocationInterceptorBuilder> builders) {
+ public InvocationInterceptorInvoker(AbstractClusterInvoker<T> invoker, List<InvocationInterceptorBuilder> builders) {
ClusterInvoker<T> tmpInvoker = invoker;
for (InvocationInterceptorBuilder builder : builders) {
tmpInvoker = builder.buildClusterInterceptorChain(tmpInvoker, INVOCATION_INTERCEPTOR_KEY, CommonConstants.CONSUMER);
@@ -248,4 +170,96 @@ public abstract class AbstractCluster implements Cluster {
return null;
}
}
+
+ @Deprecated
+ private <T> ClusterInvoker<T> build27xCompatibleClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, AbstractClusterInvoker<T> last) {
+ List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtensions();
+
+ if (!interceptors.isEmpty()) {
+ for (int i = interceptors.size() - 1; i >= 0; i--) {
+ final ClusterInterceptor interceptor = interceptors.get(i);
+ final AbstractClusterInvoker<T> next = last;
+ last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next);
+ }
+ }
+ return last;
+ }
+
+ @Deprecated
+ static class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> {
+
+ private AbstractClusterInvoker<T> clusterInvoker;
+ private ClusterInterceptor interceptor;
+ private AbstractClusterInvoker<T> next;
+
+ public InterceptorInvokerNode(AbstractClusterInvoker<T> clusterInvoker,
+ ClusterInterceptor interceptor,
+ AbstractClusterInvoker<T> next) {
+ this.clusterInvoker = clusterInvoker;
+ this.interceptor = interceptor;
+ this.next = next;
+ }
+
+ @Override
+ public Class<T> getInterface() {
+ return clusterInvoker.getInterface();
+ }
+
+ @Override
+ public URL getUrl() {
+ return clusterInvoker.getUrl();
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return clusterInvoker.isAvailable();
+ }
+
+ @Override
+ public Result invoke(Invocation invocation) throws RpcException {
+ Result asyncResult;
+ try {
+ interceptor.before(next, invocation);
+ asyncResult = interceptor.intercept(next, invocation);
+ } catch (Exception e) {
+ // onError callback
+ if (interceptor instanceof ClusterInterceptor.Listener) {
+ ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
+ listener.onError(e, clusterInvoker, invocation);
+ }
+ throw e;
+ } finally {
+ interceptor.after(next, invocation);
+ }
+ return asyncResult.whenCompleteWithContext((r, t) -> {
+ // onResponse callback
+ if (interceptor instanceof ClusterInterceptor.Listener) {
+ ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
+ if (t == null) {
+ listener.onMessage(r, clusterInvoker, invocation);
+ } else {
+ listener.onError(t, clusterInvoker, invocation);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void destroy() {
+ clusterInvoker.destroy();
+ }
+
+ @Override
+ public String toString() {
+ return clusterInvoker.toString();
+ }
+
+ @Override
+ protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
+ // The only purpose is to build a interceptor chain, so the cluster related logic doesn't matter.
+ return null;
+ }
+ }
+
+
}
diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter
new file mode 100644
index 0000000..8f70d31
--- /dev/null
+++ b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter
@@ -0,0 +1,2 @@
+zone-aware=org.apache.dubbo.rpc.cluster.filter.support.ZoneAwareFilter
+consumercontext=org.apache.dubbo.rpc.cluster.filter.support.ConsumerContextFilter
\ No newline at end of file
diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.interceptor.ClusterInterceptor b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.interceptor.ClusterInterceptor
deleted file mode 100644
index 3f3f008..0000000
--- a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.interceptor.ClusterInterceptor
+++ /dev/null
@@ -1,2 +0,0 @@
-context=org.apache.dubbo.rpc.cluster.interceptor.ConsumerContextClusterInterceptor
-zone-aware=org.apache.dubbo.rpc.cluster.interceptor.ZoneAwareClusterInterceptor
\ No newline at end of file
diff --git a/dubbo-common/pom.xml b/dubbo-common/pom.xml
index 44d9fab..b424661 100644
--- a/dubbo-common/pom.xml
+++ b/dubbo-common/pom.xml
@@ -72,6 +72,10 @@
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.eclipse.collections</groupId>
+ <artifactId>eclipse-collections</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java
index 619b1bb..5bd2970 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java
@@ -21,8 +21,9 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.url.component.ServiceConfigURL;
import org.apache.dubbo.common.url.component.URLItemCache;
+import org.eclipse.collections.impl.map.mutable.UnifiedMap;
+
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY_PREFIX;
@@ -73,7 +74,7 @@ public final class URLStrParser {
}
TempBuf tempBuf = DECODE_TEMP_BUF.get();
- Map<String, String> params = new HashMap<>();
+ Map<String, String> params = new UnifiedMap<>();
int nameStart = from;
int valueStart = -1;
int i;
@@ -169,7 +170,7 @@ public final class URLStrParser {
}
// check cache
- protocol = URLItemCache.checkProtocol(protocol);
+ protocol = URLItemCache.intern(protocol);
path = URLItemCache.checkPath(path);
return new ServiceConfigURL(protocol, username, password, host, port, path, parameters);
@@ -233,7 +234,7 @@ public final class URLStrParser {
}
TempBuf tempBuf = DECODE_TEMP_BUF.get();
- Map<String, String> params = new HashMap<>();
+ Map<String, String> params = new UnifiedMap<>();
int nameStart = from;
int valueStart = -1;
int i;
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java b/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java
index ea4b4e5..3da6cca 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java
@@ -17,11 +17,13 @@
package org.apache.dubbo.common.config;
import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
+import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.context.FrameworkExt;
import org.apache.dubbo.common.context.LifecycleAdapter;
import org.apache.dubbo.common.extension.DisableInject;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.config.AbstractConfig;
import org.apache.dubbo.config.ConfigCenterConfig;
import org.apache.dubbo.config.context.ConfigConfigurationAdapter;
@@ -54,6 +56,7 @@ public class Environment extends LifecycleAdapter implements FrameworkExt {
private boolean configCenterFirst = true;
private DynamicConfiguration dynamicConfiguration;
+ private String localMigrationRule;
public Environment() {
this.propertiesConfiguration = new PropertiesConfiguration();
@@ -76,6 +79,19 @@ public class Environment extends LifecycleAdapter implements FrameworkExt {
this.externalConfiguration.setProperties(externalConfigurationMap);
this.appExternalConfiguration.setProperties(appExternalConfigurationMap);
+
+ loadMigrationRule();
+ }
+
+ private void loadMigrationRule() {
+ String path = System.getProperty(CommonConstants.DUBBO_MIGRATION_KEY);
+ if (path == null || path.length() == 0) {
+ path = System.getenv(CommonConstants.DUBBO_MIGRATION_KEY);
+ if (path == null || path.length() == 0) {
+ path = CommonConstants.DEFAULT_DUBBO_MIGRATION_FILE;
+ }
+ }
+ this.localMigrationRule = ConfigUtils.loadMigrationRule(path);
}
@DisableInject
@@ -220,6 +236,10 @@ public class Environment extends LifecycleAdapter implements FrameworkExt {
return appExternalConfiguration;
}
+ public String getLocalMigrationRule() {
+ return localMigrationRule;
+ }
+
// For test
public void clearExternalConfigs() {
this.externalConfiguration.clear();
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index 268c313..953b11b 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -45,6 +45,10 @@ public interface CommonConstants {
String DEFAULT_DUBBO_PROPERTIES = "dubbo.properties";
+ String DUBBO_MIGRATION_KEY = "dubbo.migration.file";
+
+ String DEFAULT_DUBBO_MIGRATION_FILE = "dubbo-migration.yaml";
+
String ANY_VALUE = "*";
/**
@@ -376,4 +380,6 @@ public interface CommonConstants {
String CACHE_CLEAR_TASK_INTERVAL = "dubbo.application.url.cache.task.interval";
String CACHE_CLEAR_WAITING_THRESHOLD = "dubbo.application.url.cache.clear.waiting";
+ String CLUSTER_INTERCEPTOR_COMPATIBLE_KEY = "dubbo.application.cluster.interceptor.compatible";
+
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLAddress.java b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLAddress.java
index 63d140b..a0e89f1 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLAddress.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLAddress.java
@@ -252,7 +252,7 @@ public class URLAddress implements Serializable {
}
// check cache
- protocol = URLItemCache.checkProtocol(protocol);
+ protocol = URLItemCache.intern(protocol);
path = URLItemCache.checkPath(path);
return new PathURLAddress(protocol, username, password, path, host, port, rawAddress);
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLItemCache.java b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLItemCache.java
index 2384493..54372c2 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLItemCache.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLItemCache.java
@@ -19,14 +19,13 @@ package org.apache.dubbo.common.url.component;
import org.apache.dubbo.common.utils.LRUCache;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
public class URLItemCache {
// thread safe with limited size, by default 1000
private static final Map<String, String> PARAM_KEY_CACHE = new LRUCache<>(10000);
- private static final Map<String, String> PARAM_VALUE_CACHE = new LRUCache<>(100000);
+ private static final Map<String, String> PARAM_VALUE_CACHE = new LRUCache<>(50000);
private static final Map<String, String> PATH_CACHE = new LRUCache<>(10000);
- private static final Map<String, String> PROTOCOL_CACHE = new ConcurrentHashMap<>();
+ private static final Map<String, String> REVISION_CACHE = new LRUCache<>(10000);
public static void putParams(Map<String, String> params, String key, String value) {
String cachedKey = PARAM_KEY_CACHE.get(key);
@@ -43,17 +42,6 @@ public class URLItemCache {
params.put(cachedKey, cachedValue);
}
- public static String checkProtocol(String _protocol) {
- if (_protocol == null) {
- return _protocol;
- }
- String cachedProtocol = PROTOCOL_CACHE.putIfAbsent(_protocol, _protocol);
- if (cachedProtocol != null) {
- return cachedProtocol;
- }
- return _protocol;
- }
-
public static String checkPath(String _path) {
if (_path == null) {
return _path;
@@ -64,4 +52,32 @@ public class URLItemCache {
}
return _path;
}
+
+ public static String checkRevision(String _revision) {
+ if (_revision == null) {
+ return _revision;
+ }
+ String revision = REVISION_CACHE.putIfAbsent(_revision, _revision);
+ if (revision != null) {
+ return revision;
+ }
+ return _revision;
+ }
+
+ public static String intern(String _protocol) {
+ if (_protocol == null) {
+ return _protocol;
+ }
+ return _protocol.intern();
+ }
+
+ public static void putParamsIntern(Map<String, String> params, String key, String value) {
+ if (key == null || value == null) {
+ params.put(key, value);
+ return;
+ }
+ key = key.intern();
+ value = value.intern();
+ params.put(key, value);
+ }
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLParam.java b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLParam.java
index b2dca0c..cea92cb 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLParam.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLParam.java
@@ -21,6 +21,8 @@ import org.apache.dubbo.common.URLStrParser;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
+import org.eclipse.collections.impl.map.mutable.UnifiedMap;
+
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
@@ -263,7 +265,7 @@ public class URLParam implements Serializable {
public static URLParam parse(String rawParam) {
String[] parts = rawParam.split("&");
- Map<String, String> parameters = new HashMap<>((int) (parts.length/.75f) + 1);
+ Map<String, String> parameters = new UnifiedMap<>((int) (parts.length/.75f) + 1);
for (String part : parts) {
part = part.trim();
if (part.length() > 0) {
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ConfigUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ConfigUtils.java
index c1f4711..d0d0693 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ConfigUtils.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ConfigUtils.java
@@ -21,12 +21,16 @@ import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
@@ -296,6 +300,49 @@ public class ConfigUtils {
return properties;
}
+ public static String loadMigrationRule(String fileName) {
+ String rawRule = "";
+ if (checkFileNameExist(fileName)) {
+ try {
+ try (FileInputStream input = new FileInputStream(fileName)) {
+ rawRule = readString(input);
+ }
+ } catch (Throwable e) {
+ logger.warn("Failed to load " + fileName + " file from " + fileName + "(ignore this file): " + e.getMessage(), e);
+ }
+ return rawRule;
+ }
+
+ try {
+ InputStream is = ClassUtils.getClassLoader().getResourceAsStream(fileName);
+ if (is != null) {
+ rawRule = readString(is);
+ }
+ } catch (Throwable e) {
+ logger.warn("Failed to load " + fileName + " file from " + fileName + "(ignore this file): " + e.getMessage(), e);
+ }
+ return rawRule;
+ }
+
+ private static String readString(InputStream is) {
+ StringBuilder stringBuilder = new StringBuilder();
+ char[] buffer = new char[10];
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))){
+ int n;
+ while ((n = reader.read(buffer)) != -1) {
+ if (n < 10) {
+ buffer = Arrays.copyOf(buffer, n);
+ }
+ stringBuilder.append(String.valueOf(buffer));
+ buffer = new char[10];
+ }
+ } catch (IOException e) {
+ logger.error("Read migration file error.", e);
+ }
+
+ return stringBuilder.toString();
+ }
+
/**
* check if the fileName can be found in filesystem
*
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java
index 4e7bbf4..47556c5 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java
@@ -36,6 +36,7 @@ import javax.annotation.PostConstruct;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -336,7 +337,7 @@ public abstract class AbstractConfig implements Serializable {
String value = entry.getValue();
result.put(pre + key, value);
// For compatibility, key like "registry-type" will has a duplicate key "registry.type"
- if (key.contains("-")) {
+ if (Arrays.binarySearch(Constants.DOT_COMPATIBLE_KEYS, key) != -1) {
result.put(pre + key.replace('-', '.'), value);
}
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/config/Constants.java
index f8fed84..894f138 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/Constants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/Constants.java
@@ -117,4 +117,6 @@ public interface Constants {
String REGISTER_KEY = "register";
String MULTI_SERIALIZATION_KEY = "serialize.multiple";
+
+ String[] DOT_COMPATIBLE_KEYS = new String[]{"qos-enable", "qos-port", "qos-accept-foreign-ip"};
}
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
index a4116f4..cdf66dd 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
@@ -520,6 +520,7 @@ public class DubboBootstrap extends GenericEventListener {
}
ApplicationModel.initFrameworkExts();
+
startConfigCenter();
@@ -1162,7 +1163,7 @@ public class DubboBootstrap extends GenericEventListener {
private void doRegisterServiceInstance(ServiceInstance serviceInstance) {
// register instance only when at least one service is exported.
- if (serviceInstance.getPort() != null && serviceInstance.getPort() != -1) {
+ if (serviceInstance.getPort() > 0) {
publishMetadataToRemote(serviceInstance);
logger.info("Start registering instance address to registry.");
getServiceDiscoveries().forEach(serviceDiscovery ->
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ServiceInstanceHostPortCustomizer.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ServiceInstanceHostPortCustomizer.java
index 3831337..8693828 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ServiceInstanceHostPortCustomizer.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ServiceInstanceHostPortCustomizer.java
@@ -36,14 +36,14 @@ public class ServiceInstanceHostPortCustomizer implements ServiceInstanceCustomi
@Override
public void customize(ServiceInstance serviceInstance) {
- if (serviceInstance.getPort() != null) {
+ if (serviceInstance.getPort() > 0) {
return;
}
WritableMetadataService writableMetadataService = WritableMetadataService.getDefaultExtension();
String host = null;
- Integer port = null;
+ int port = -1;
Set<URL> urls = writableMetadataService.getExportedServiceURLs();
if (CollectionUtils.isNotEmpty(urls)) {
String preferredProtocol = ApplicationModel.getApplicationConfig().getProtocol();
@@ -64,7 +64,6 @@ public class ServiceInstanceHostPortCustomizer implements ServiceInstanceCustomi
DefaultServiceInstance instance = (DefaultServiceInstance) serviceInstance;
instance.setHost(host);
instance.setPort(port);
- instance.setId(host + ":" + port);
}
}
}
diff --git a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/AbstractConfigTest.java b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/AbstractConfigTest.java
index 041cd75..04d5f72 100644
--- a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/AbstractConfigTest.java
+++ b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/AbstractConfigTest.java
@@ -116,8 +116,6 @@ public class AbstractConfigTest {
Assertions.assertEquals("ONE,1", parameters.get("prefix.num"));
Assertions.assertEquals("hello%2Fworld", parameters.get("prefix.naming"));
Assertions.assertEquals("30", parameters.get("prefix.age"));
- Assertions.assertTrue(parameters.containsKey("prefix.key-2"));
- Assertions.assertTrue(parameters.containsKey("prefix.key.2"));
Assertions.assertFalse(parameters.containsKey("prefix.secret"));
}
@@ -807,7 +805,7 @@ public class AbstractConfigTest {
public Map getParameters() {
Map<String, String> map = new HashMap<String, String>();
map.put("key.1", "one");
- map.put("key-2", "two");
+ map.put("key.2", "two");
return map;
}
}
diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/dubbo-migration.yaml b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/dubbo-migration.yaml
new file mode 100644
index 0000000..dc2e8f2
--- /dev/null
+++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/dubbo-migration.yaml
@@ -0,0 +1,3 @@
+key: demo-consumer
+step: FORCE_APPLICATION
+threshold: 0.1
\ No newline at end of file
diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index e6b67c1..8cc1d43 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -91,6 +91,7 @@
<!-- Common libs -->
<spring_version>4.3.16.RELEASE</spring_version>
<javassist_version>3.20.0-GA</javassist_version>
+ <eclipse_collections_version>10.4.0</eclipse_collections_version>
<netty_version>3.2.5.Final</netty_version>
<netty4_version>4.1.56.Final</netty4_version>
<mina_version>1.1.7</mina_version>
@@ -186,6 +187,11 @@
<version>${javassist_version}</version>
</dependency>
<dependency>
+ <groupId>org.eclipse.collections</groupId>
+ <artifactId>eclipse-collections</artifactId>
+ <version>${eclipse_collections_version}</version>
+ </dependency>
+ <dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>${netty_version}</version>
diff --git a/dubbo-distribution/dubbo-all/pom.xml b/dubbo-distribution/dubbo-all/pom.xml
index 5729680..b464e41 100644
--- a/dubbo-distribution/dubbo-all/pom.xml
+++ b/dubbo-distribution/dubbo-all/pom.xml
@@ -312,6 +312,10 @@
<artifactId>javassist</artifactId>
</dependency>
<dependency>
+ <groupId>org.eclipse.collections</groupId>
+ <artifactId>eclipse-collections</artifactId>
+ </dependency>
+ <dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
@@ -513,6 +517,10 @@
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter</resource>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/dubbo/internal/org.apache.dubbo.rpc.InvokerListener</resource>
</transformer>
<transformer
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index 024328e..4106ad2 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -42,6 +42,8 @@ import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPAR
import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
public class MetadataInfo implements Serializable {
+ public static final MetadataInfo EMPTY = new MetadataInfo();
+
private String app;
private String revision;
private Map<String, ServiceInfo> services;
@@ -50,6 +52,8 @@ public class MetadataInfo implements Serializable {
private transient Map<String, String> extendParams;
private transient AtomicBoolean reported = new AtomicBoolean(false);
+ public MetadataInfo() {}
+
public MetadataInfo(String app) {
this(app, null, null);
}
diff --git a/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java b/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java
index 1492e0b..47e4bc7 100644
--- a/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java
+++ b/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java
@@ -36,7 +36,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
@@ -49,7 +48,7 @@ import static org.apache.dubbo.rpc.Constants.OUTPUT_KEY;
/**
* MonitorFilter. (SPI, Singleton, ThreadSafe)
*/
-@Activate(group = {PROVIDER, CONSUMER})
+@Activate(group = {PROVIDER})
public class MonitorFilter implements Filter, Filter.Listener {
private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class);
diff --git a/dubbo-plugin/dubbo-auth/src/main/java/org/apache/dubbo/auth/filter/ConsumerSignFilter.java b/dubbo-plugin/dubbo-auth/src/main/java/org/apache/dubbo/auth/filter/ConsumerSignFilter.java
index cf984a5..96438c5 100644
--- a/dubbo-plugin/dubbo-auth/src/main/java/org/apache/dubbo/auth/filter/ConsumerSignFilter.java
+++ b/dubbo-plugin/dubbo-auth/src/main/java/org/apache/dubbo/auth/filter/ConsumerSignFilter.java
@@ -33,7 +33,7 @@ import org.apache.dubbo.rpc.RpcException;
*
* @see org.apache.dubbo.rpc.Filter
*/
-@Activate(group = CommonConstants.CONSUMER, order = -10000)
+@Activate(group = CommonConstants.CONSUMER, value = Constants.SERVICE_AUTH, order = -10000)
public class ConsumerSignFilter implements Filter {
@Override
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java
index 89e3e75..4c02cdc 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java
@@ -45,4 +45,8 @@ public interface NotifyListener {
default void addServiceListener(ServiceInstancesChangedListener instanceListener) {
}
+ default URL getConsumerUrl() {
+ return null;
+ }
+
}
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
index 93ba70e..1bcf284 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.registry.client;
import org.apache.dubbo.metadata.MetadataInfo;
import com.alibaba.fastjson.JSON;
+import org.eclipse.collections.impl.map.mutable.UnifiedMap;
import java.util.HashMap;
import java.util.List;
@@ -39,24 +40,23 @@ public class DefaultServiceInstance implements ServiceInstance {
private static final long serialVersionUID = 1149677083747278100L;
- private String id;
-
private String serviceName;
private String host;
- private Integer port;
+ private int port;
private boolean enabled;
private boolean healthy;
- private Map<String, String> metadata = new HashMap<>();
+ private Map<String, String> metadata = new UnifiedMap<>();
private transient String address;
private transient MetadataInfo serviceMetadata;
// used at runtime
- private transient Map<String, String> extendParams = new HashMap<>();
+ private transient String registryCluster; // extendParams can be more flexiable, but one single property uses less space
+ private transient Map<String, String> extendParams;
private transient List<Endpoint> endpoints;
public DefaultServiceInstance() {
@@ -70,17 +70,16 @@ public class DefaultServiceInstance implements ServiceInstance {
this.healthy = other.healthy;
this.metadata = other.metadata;
this.serviceMetadata = other.serviceMetadata;
+ this.registryCluster = other.registryCluster;
this.extendParams = other.extendParams;
this.endpoints = other.endpoints;
this.address = null;
- this.id = null;
}
- public DefaultServiceInstance(String id, String serviceName, String host, Integer port) {
+ public DefaultServiceInstance(String serviceName, String host, Integer port) {
if (port != null && port.intValue() < 1) {
throw new IllegalArgumentException("The port must be greater than zero!");
}
- this.id = id;
this.serviceName = serviceName;
this.host = host;
this.port = port;
@@ -88,18 +87,10 @@ public class DefaultServiceInstance implements ServiceInstance {
this.healthy = true;
}
- public DefaultServiceInstance(String serviceName, String host, Integer port) {
- this(host + ":" + port, serviceName, host, port);
- }
-
public DefaultServiceInstance(String serviceName) {
this.serviceName = serviceName;
}
- public void setId(String id) {
- this.id = id;
- }
-
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
@@ -109,11 +100,6 @@ public class DefaultServiceInstance implements ServiceInstance {
}
@Override
- public String getId() {
- return id;
- }
-
- @Override
public String getServiceName() {
return serviceName;
}
@@ -123,12 +109,12 @@ public class DefaultServiceInstance implements ServiceInstance {
return host;
}
- public void setPort(Integer port) {
+ public void setPort(int port) {
this.port = port;
}
@Override
- public Integer getPort() {
+ public int getPort() {
return port;
}
@@ -173,7 +159,19 @@ public class DefaultServiceInstance implements ServiceInstance {
}
@Override
+ public String getRegistryCluster() {
+ return registryCluster;
+ }
+
+ public void setRegistryCluster(String registryCluster) {
+ this.registryCluster = registryCluster;
+ }
+
+ @Override
public Map<String, String> getExtendParams() {
+ if (extendParams == null) {
+ extendParams = new HashMap<>();
+ }
return extendParams;
}
@@ -187,16 +185,19 @@ public class DefaultServiceInstance implements ServiceInstance {
public DefaultServiceInstance copy(Endpoint endpoint) {
DefaultServiceInstance copyOfInstance = new DefaultServiceInstance(this);
copyOfInstance.setPort(endpoint.getPort());
- copyOfInstance.setId(copyOfInstance.getAddress());
return copyOfInstance;
}
@Override
public Map<String, String> getAllParams() {
- Map<String, String> allParams = new HashMap<>((int) ((metadata.size() + extendParams.size()) / 0.75f + 1));
- allParams.putAll(metadata);
- allParams.putAll(extendParams);
- return allParams;
+ if (extendParams == null) {
+ return metadata;
+ } else {
+ Map<String, String> allParams = new HashMap<>((int) ((metadata.size() + extendParams.size()) / 0.75f + 1));
+ allParams.putAll(metadata);
+ allParams.putAll(extendParams);
+ return allParams;
+ }
}
public void setMetadata(Map<String, String> metadata) {
@@ -249,7 +250,6 @@ public class DefaultServiceInstance implements ServiceInstance {
@Override
public String toString() {
return "DefaultServiceInstance{" +
- "id='" + id + '\'' +
", serviceName='" + serviceName + '\'' +
", host='" + host + '\'' +
", port=" + port +
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java
index ee99000..f9c8019 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java
@@ -224,6 +224,21 @@ final class EventPublishingServiceDiscovery implements ServiceDiscovery {
}
@Override
+ public ServiceInstancesChangedListener createListener(Set<String> serviceNames) {
+ return serviceDiscovery.createListener(serviceNames);
+ }
+
+ @Override
+ public void removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws IllegalArgumentException {
+ serviceDiscovery.removeServiceInstancesChangedListener(listener);
+ }
+
+ @Override
+ public long getDelay() {
+ return serviceDiscovery.getDelay();
+ }
+
+ @Override
public URL getUrl() {
return serviceDiscovery.getUrl();
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java
index 2a51168..1482a9d 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java
@@ -110,7 +110,7 @@ public class FileSystemServiceDiscovery implements ServiceDiscovery, EventListen
}
private String getServiceInstanceId(ServiceInstance serviceInstance) {
- String id = serviceInstance.getId();
+ String id = serviceInstance.getAddress();
if (StringUtils.isBlank(id)) {
return serviceInstance.getHost() + "." + serviceInstance.getPort();
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
index 1034420..7537090 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
@@ -207,7 +207,7 @@ public abstract class SelfHostMetaServiceDiscovery implements ServiceDiscovery {
@SuppressWarnings("unchecked")
public final void fillServiceInstance(DefaultServiceInstance serviceInstance) {
- String hostId = serviceInstance.getId();
+ String hostId = serviceInstance.getAddress();
if (metadataMap.containsKey(hostId)) {
// Use cached metadata.
// Metadata will be updated by provider callback
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
index 90ba196..aa318d4 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
@@ -219,6 +219,7 @@ public interface ServiceDiscovery extends Prioritized {
/**
* unsubscribe to instances change event.
+ *
* @param listener
* @throws IllegalArgumentException
*/
@@ -226,6 +227,10 @@ public interface ServiceDiscovery extends Prioritized {
throws IllegalArgumentException {
}
+ default ServiceInstancesChangedListener createListener(Set<String> serviceNames) {
+ return new ServiceInstancesChangedListener(serviceNames, this);
+ }
+
/**
* Dispatch the {@link ServiceInstancesChangedEvent}
*
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
index d19c8de..31ae6f9 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
@@ -291,7 +291,7 @@ public class ServiceDiscoveryRegistry implements Registry {
// register ServiceInstancesChangedListener
ServiceInstancesChangedListener serviceListener = serviceListeners.computeIfAbsent(serviceNamesKey, k -> {
- ServiceInstancesChangedListener serviceInstancesChangedListener = new ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
+ ServiceInstancesChangedListener serviceInstancesChangedListener = serviceDiscovery.createListener(serviceNames);
serviceInstancesChangedListener.setUrl(url);
serviceNames.forEach(serviceName -> {
List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
index c41e028..6232097 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
@@ -32,9 +32,10 @@ import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.cluster.RouterChain;
+import org.eclipse.collections.impl.map.mutable.UnifiedMap;
+
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -152,7 +153,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
* @return invokers
*/
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
- Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
+ Map<String, Invoker<T>> newUrlInvokerMap = new UnifiedMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java
index 5bb2bfe..3a55c4e 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java
@@ -29,13 +29,6 @@ import java.util.SortedMap;
public interface ServiceInstance extends Serializable {
/**
- * The id of the registered service instance.
- *
- * @return nullable
- */
- String getId();
-
- /**
* The name of service that current instance belongs to.
*
* @return non-null
@@ -54,7 +47,7 @@ public interface ServiceInstance extends Serializable {
*
* @return the positive integer if present
*/
- Integer getPort();
+ int getPort();
String getAddress();
@@ -87,6 +80,10 @@ public interface ServiceInstance extends Serializable {
SortedMap<String, String> getSortedMetadata();
+ String getRegistryCluster();
+
+ void setRegistryCluster(String registryCluster);
+
Map<String, String> getExtendParams();
Map<String, String> getAllParams();
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 22280bd..dd0a0cf 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -53,7 +53,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE;
-import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_CLUSTER_KEY;
import static org.apache.dubbo.metadata.RevisionResolver.EMPTY_REVISION;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getExportedServicesRevision;
@@ -67,14 +66,14 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
private static final Logger logger = LoggerFactory.getLogger(ServiceInstancesChangedListener.class);
- private final Set<String> serviceNames;
- private final ServiceDiscovery serviceDiscovery;
- private URL url;
- private Map<String, NotifyListener> listeners;
+ protected final Set<String> serviceNames;
+ protected final ServiceDiscovery serviceDiscovery;
+ protected URL url;
+ protected Map<String, NotifyListener> listeners;
private Map<String, List<ServiceInstance>> allInstances;
- private Map<String, List<URL>> serviceUrls;
+ private Map<String, Object> serviceUrls;
private Map<String, MetadataInfo> revisionToMetadata;
@@ -112,8 +111,8 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>();
Map<ServiceInfo, Set<String>> localServiceToRevisions = new HashMap<>();
- Map<String, Map<Set<String>, List<URL>>> protocolRevisionsToUrls = new HashMap<>();
- Map<String, List<URL>> newServiceUrls = new HashMap<>();//TODO
+ Map<String, Map<Set<String>, Object>> protocolRevisionsToUrls = new HashMap<>();
+ Map<String, Object> newServiceUrls = new HashMap<>();//TODO
Map<String, MetadataInfo> newRevisionToMetadata = new HashMap<>();
for (Map.Entry<String, List<ServiceInstance>> entry : allInstances.entrySet()) {
@@ -151,27 +150,14 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
localServiceToRevisions.forEach((serviceInfo, revisions) -> {
String protocol = serviceInfo.getProtocol();
- Map<Set<String>, List<URL>> revisionsToUrls = protocolRevisionsToUrls.computeIfAbsent(protocol, k -> {
+ Map<Set<String>, Object> revisionsToUrls = protocolRevisionsToUrls.computeIfAbsent(protocol, k -> {
return new HashMap<>();
});
- List<URL> urls = revisionsToUrls.get(revisions);
+ Object urls = revisionsToUrls.get(revisions);
if (urls != null) {
newServiceUrls.put(serviceInfo.getMatchKey(), urls);
} else {
- urls = new ArrayList<>();
- for (String r : revisions) {
- for (ServiceInstance i : revisionToInstances.get(r)) {
- // different protocols may have ports specified in meta
- if (ServiceInstanceMetadataUtils.hasEndpoints(i)) {
- DefaultServiceInstance.Endpoint endpoint = ServiceInstanceMetadataUtils.getEndpoint(i, protocol);
- if (endpoint != null && !endpoint.getPort().equals(i.getPort())) {
- urls.add(((DefaultServiceInstance)i).copy(endpoint).toURL());
- break;
- }
- }
- urls.add(i.toURL());
- }
- }
+ urls = getServiceUrlsCache(revisionToInstances, revisions, protocol);
revisionsToUrls.put(revisions, urls);
newServiceUrls.put(serviceInfo.getMatchKey(), urls);
}
@@ -183,7 +169,7 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
public synchronized void addListenerAndNotify(String serviceKey, NotifyListener listener) {
this.listeners.put(serviceKey, listener);
- List<URL> urls = this.serviceUrls.get(serviceKey);
+ List<URL> urls = getAddresses(serviceKey);
if (CollectionUtils.isNotEmpty(urls)) {
listener.notify(urls);
}
@@ -197,7 +183,7 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
}
public List<URL> getUrls(String serviceKey) {
- return toUrlsWithEmpty(serviceUrls.get(serviceKey));
+ return toUrlsWithEmpty(getAddresses(serviceKey));
}
/**
@@ -241,7 +227,7 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
return serviceNames.contains(event.getServiceName());
}
- private boolean isRetryAndExpired(ServiceInstancesChangedEvent event) {
+ protected boolean isRetryAndExpired(ServiceInstancesChangedEvent event) {
String appName = event.getServiceName();
List<ServiceInstance> appInstances = event.getServiceInstances();
@@ -261,13 +247,13 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
return false;
}
- private boolean hasEmptyMetadata(Map<String, MetadataInfo> revisionToMetadata) {
+ protected boolean hasEmptyMetadata(Map<String, MetadataInfo> revisionToMetadata) {
if (revisionToMetadata == null) {
return false;
}
boolean result = false;
for (Map.Entry<String, MetadataInfo> entry : revisionToMetadata.entrySet()) {
- if (entry.getValue() == null) {
+ if (entry.getValue() == MetadataInfo.EMPTY) {
result = true;
break;
}
@@ -275,31 +261,31 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
return result;
}
- private MetadataInfo getRemoteMetadata(ServiceInstance instance, String revision, Map<ServiceInfo, Set<String>> localServiceToRevisions, List<ServiceInstance> subInstances) {
+ protected MetadataInfo getRemoteMetadata(ServiceInstance instance, String revision, Map<ServiceInfo, Set<String>> localServiceToRevisions, List<ServiceInstance> subInstances) {
MetadataInfo metadata = revisionToMetadata.get(revision);
- if (metadata == null) {
- if (failureCounter.get() < 3 || (System.currentTimeMillis() - lastFailureTime > 10000)) {
- metadata = getMetadataInfo(instance);
- if (metadata != null) {
- logger.info("MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision + " is " + metadata);
- failureCounter.set(0);
- revisionToMetadata.putIfAbsent(revision, metadata);
- parseMetadata(revision, metadata, localServiceToRevisions);
- } else {
- logger.error("Failed to get MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision
- + ", wait for retry.");
- lastFailureTime = System.currentTimeMillis();
- failureCounter.incrementAndGet();
- }
+ if (metadata == null
+ || (metadata == MetadataInfo.EMPTY && (failureCounter.get() < 3 || (System.currentTimeMillis() - lastFailureTime > 10000)))) {
+ metadata = getMetadataInfo(instance);
+
+ if (metadata != MetadataInfo.EMPTY) {
+ logger.info("MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision + " is " + metadata);
+ failureCounter.set(0);
+ revisionToMetadata.putIfAbsent(revision, metadata);
+ parseMetadata(revision, metadata, localServiceToRevisions);
+ } else {
+ logger.error("Failed to get MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision
+ + ", wait for retry.");
+ lastFailureTime = System.currentTimeMillis();
+ failureCounter.incrementAndGet();
}
- } else if (subInstances.size() == 1) {
+ } else if (metadata != MetadataInfo.EMPTY && subInstances.size() == 1) {
// "subInstances.size() >= 2" means metadata of this revision has been parsed, ignore
parseMetadata(revision, metadata, localServiceToRevisions);
}
return metadata;
}
- private Map<ServiceInfo, Set<String>> parseMetadata(String revision, MetadataInfo metadata, Map<ServiceInfo, Set<String>> localServiceToRevisions) {
+ protected Map<ServiceInfo, Set<String>> parseMetadata(String revision, MetadataInfo metadata, Map<ServiceInfo, Set<String>> localServiceToRevisions) {
Map<String, ServiceInfo> serviceInfos = metadata.getServices();
for (Map.Entry<String, ServiceInfo> entry : serviceInfos.entrySet()) {
Set<String> set = localServiceToRevisions.computeIfAbsent(entry.getValue(), k -> new TreeSet<>());
@@ -309,10 +295,12 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
return localServiceToRevisions;
}
- private MetadataInfo getMetadataInfo(ServiceInstance instance) {
+ protected MetadataInfo getMetadataInfo(ServiceInstance instance) {
String metadataType = ServiceInstanceMetadataUtils.getMetadataStorageType(instance);
// FIXME, check "REGISTRY_CLUSTER_KEY" must be set by every registry implementation.
- instance.getExtendParams().putIfAbsent(REGISTRY_CLUSTER_KEY, RegistryClusterIdentifier.getExtension(url).consumerKey(url));
+ if (instance.getRegistryCluster() == null) {
+ instance.setRegistryCluster(RegistryClusterIdentifier.getExtension(url).consumerKey(url));
+ }
MetadataInfo metadataInfo;
try {
if (logger.isDebugEnabled()) {
@@ -332,19 +320,46 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
logger.error("Failed to load service metadata, meta type is " + metadataType, e);
metadataInfo = null;
}
+
+ if (metadataInfo == null) {
+ metadataInfo = MetadataInfo.EMPTY;
+ }
return metadataInfo;
}
- private void notifyAddressChanged() {
+ protected Object getServiceUrlsCache(Map<String, List<ServiceInstance>> revisionToInstances, Set<String> revisions, String protocol) {
+ List<URL> urls;
+ urls = new ArrayList<>();
+ for (String r : revisions) {
+ for (ServiceInstance i : revisionToInstances.get(r)) {
+ // different protocols may have ports specified in meta
+ if (ServiceInstanceMetadataUtils.hasEndpoints(i)) {
+ DefaultServiceInstance.Endpoint endpoint = ServiceInstanceMetadataUtils.getEndpoint(i, protocol);
+ if (endpoint != null && !endpoint.getPort().equals(i.getPort())) {
+ urls.add(((DefaultServiceInstance) i).copy(endpoint).toURL());
+ break;
+ }
+ }
+ urls.add(i.toURL());
+ }
+ }
+ return urls;
+ }
+
+ protected List<URL> getAddresses(String serviceProtocolKey) {
+ return (List<URL>) serviceUrls.get(serviceProtocolKey);
+ }
+
+ protected void notifyAddressChanged() {
listeners.forEach((key, notifyListener) -> {
//FIXME, group wildcard match
- List<URL> urls = toUrlsWithEmpty(serviceUrls.get(key));
+ List<URL> urls = toUrlsWithEmpty(getAddresses(key));
logger.info("Notify service " + key + " with urls " + urls.size());
notifyListener.notify(urls);
});
}
- private List<URL> toUrlsWithEmpty(List<URL> urls) {
+ protected List<URL> toUrlsWithEmpty(List<URL> urls) {
if (urls == null) {
urls = Collections.emptyList();
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
index 27f66f5..42382f8 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
@@ -86,7 +86,7 @@ public class MetadataUtils {
}
public static String computeKey(ServiceInstance serviceInstance) {
- return serviceInstance.getServiceName() + "##" + serviceInstance.getId() + "##" +
+ return serviceInstance.getServiceName() + "##" + serviceInstance.getAddress() + "##" +
ServiceInstanceMetadataUtils.getExportedServicesRevision(serviceInstance);
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
index bca273b..d217e2d 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
@@ -85,7 +85,7 @@ public class RemoteMetadataServiceImpl {
SubscriberMetadataIdentifier identifier = new SubscriberMetadataIdentifier(instance.getServiceName(),
ServiceInstanceMetadataUtils.getExportedServicesRevision(instance));
- String registryCluster = instance.getExtendParams().get(REGISTRY_CLUSTER_KEY);
+ String registryCluster = instance.getRegistryCluster();
checkRemoteConfigured();
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java
index 30f1ea1..a9c7b8c 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java
@@ -39,11 +39,11 @@ public class DefaultMigrationAddressComparator implements MigrationAddressCompar
@Override
public <T> boolean shouldMigrate(ClusterInvoker<T> serviceDiscoveryInvoker, ClusterInvoker<T> invoker, MigrationRule rule) {
if (!serviceDiscoveryInvoker.hasProxyInvokers()) {
- logger.info("No instance address available, will not migrate.");
+ logger.info("No instance address available, stop compare.");
return false;
}
if (!invoker.hasProxyInvokers()) {
- logger.info("No interface address available, will migrate.");
+ logger.info("No interface address available, stop compare.");
return true;
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
index 15e2434..2e94d3f 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
@@ -27,7 +27,6 @@ import org.apache.dubbo.registry.client.migration.model.MigrationStep;
import org.apache.dubbo.registry.integration.DynamicDirectory;
import org.apache.dubbo.registry.integration.RegistryProtocol;
import org.apache.dubbo.rpc.Invocation;
-import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Cluster;
@@ -36,7 +35,6 @@ import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ConsumerModel;
-import java.util.List;
import java.util.Set;
import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;
@@ -288,6 +286,9 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
private volatile boolean invokersChanged;
+ /**
+ * Need to know which invoker change triggered this compare.
+ */
private synchronized void compareAddresses(ClusterInvoker<T> serviceDiscoveryInvoker, ClusterInvoker<T> invoker) {
this.invokersChanged = true;
if (logger.isDebugEnabled()) {
@@ -297,10 +298,10 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
Set<MigrationAddressComparator> detectors = ExtensionLoader.getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances();
if (detectors != null && detectors.stream().allMatch(migrationDetector -> migrationDetector.shouldMigrate(serviceDiscoveryInvoker, invoker, rule))) {
logger.info("serviceKey:" + invoker.getUrl().getServiceKey() + " switch to APP Level address");
- discardInterfaceInvokerAddress(invoker);
+ destroyInterfaceInvoker(invoker);
} else {
logger.info("serviceKey:" + invoker.getUrl().getServiceKey() + " switch to Service Level address");
- discardServiceDiscoveryInvokerAddress(serviceDiscoveryInvoker);
+ destroyServiceDiscoveryInvoker(serviceDiscoveryInvoker);
}
}
@@ -310,26 +311,28 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
updateConsumerModel(currentAvailableInvoker, serviceDiscoveryInvoker);
}
if (serviceDiscoveryInvoker != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("Destroying instance address invokers, will not listen for address changes until re-subscribed, " + type.getName());
+ if (serviceDiscoveryInvoker.getDirectory().isNotificationReceived()) {
+ if (logger.isInfoEnabled()) {
+ logger.info("Destroying instance address invokers, will not listen for address changes until re-subscribed, " + type.getName());
+ }
+ serviceDiscoveryInvoker.destroy();
}
- serviceDiscoveryInvoker.destroy();
}
}
- protected synchronized void discardServiceDiscoveryInvokerAddress(ClusterInvoker<T> serviceDiscoveryInvoker) {
- if (this.invoker != null) {
- this.currentAvailableInvoker = this.invoker;
- updateConsumerModel(currentAvailableInvoker, serviceDiscoveryInvoker);
- }
- if (serviceDiscoveryInvoker != null) {
- if (logger.isDebugEnabled()) {
- List<Invoker<T>> invokers = serviceDiscoveryInvoker.getDirectory().getAllInvokers();
- logger.debug("Discarding instance addresses, total size " + (invokers == null ? 0 : invokers.size()));
- }
-// serviceDiscoveryInvoker.getDirectory().discordAddresses();
- }
- }
+// protected synchronized void discardServiceDiscoveryInvokerAddress(ClusterInvoker<T> serviceDiscoveryInvoker) {
+// if (this.invoker != null) {
+// this.currentAvailableInvoker = this.invoker;
+// updateConsumerModel(currentAvailableInvoker, serviceDiscoveryInvoker);
+// }
+// if (serviceDiscoveryInvoker != null) {
+// if (logger.isDebugEnabled()) {
+// List<Invoker<T>> invokers = serviceDiscoveryInvoker.getDirectory().getAllInvokers();
+// logger.debug("Discarding instance addresses, total size " + (invokers == null ? 0 : invokers.size()));
+// }
+//// serviceDiscoveryInvoker.getDirectory().discordAddresses();
+// }
+// }
protected void refreshServiceDiscoveryInvoker() {
clearListener(serviceDiscoveryInvoker);
@@ -338,8 +341,6 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
logger.debug("Re-subscribing instance addresses, current interface " + type.getName());
}
serviceDiscoveryInvoker = registryProtocol.getServiceDiscoveryInvoker(cluster, registry, type, url);
- } else {
- ((DynamicDirectory) serviceDiscoveryInvoker.getDirectory()).markInvokersChanged();
}
}
@@ -352,8 +353,6 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
}
invoker = registryProtocol.getInvoker(cluster, registry, type, url);
- } else {
- ((DynamicDirectory) invoker.getDirectory()).markInvokersChanged();
}
}
@@ -363,26 +362,28 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
updateConsumerModel(currentAvailableInvoker, invoker);
}
if (invoker != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("Destroying interface address invokers, will not listen for address changes until re-subscribed, " + type.getName());
- }
- invoker.destroy();
- }
- }
-
- protected synchronized void discardInterfaceInvokerAddress(ClusterInvoker<T> invoker) {
- if (this.serviceDiscoveryInvoker != null) {
- this.currentAvailableInvoker = this.serviceDiscoveryInvoker;
- updateConsumerModel(currentAvailableInvoker, invoker);
- }
- if (invoker != null) {
- if (logger.isDebugEnabled()) {
- List<Invoker<T>> invokers = invoker.getDirectory().getAllInvokers();
- logger.debug("Discarding interface addresses, total address size " + (invokers == null ? 0 : invokers.size()));
+ if (invoker.getDirectory().isNotificationReceived()) {
+ if (logger.isInfoEnabled()) {
+ logger.info("Destroying interface address invokers, will not listen for address changes until re-subscribed, " + type.getName());
+ }
+ invoker.destroy();
}
- //invoker.getDirectory().discordAddresses();
}
}
+//
+// protected synchronized void discardInterfaceInvokerAddress(ClusterInvoker<T> invoker) {
+// if (this.serviceDiscoveryInvoker != null) {
+// this.currentAvailableInvoker = this.serviceDiscoveryInvoker;
+// updateConsumerModel(currentAvailableInvoker, invoker);
+// }
+// if (invoker != null) {
+// if (logger.isDebugEnabled()) {
+// List<Invoker<T>> invokers = invoker.getDirectory().getAllInvokers();
+// logger.debug("Discarding interface addresses, total address size " + (invokers == null ? 0 : invokers.size()));
+// }
+// //invoker.getDirectory().discordAddresses();
+// }
+// }
private void clearListener(ClusterInvoker<T> invoker) {
if (invoker == null) return;
@@ -410,9 +411,9 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
if (workingInvoker != null) {
consumerModel.getServiceMetadata().addAttribute("currentClusterInvoker", workingInvoker);
}
- if (backInvoker != null) {
- consumerModel.getServiceMetadata().addAttribute("backupClusterInvoker", backInvoker);
- }
+// if (backInvoker != null) {
+// consumerModel.getServiceMetadata().addAttribute("backupClusterInvoker", backInvoker);
+// }
}
}
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java
index 5972e06..b9718f4 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java
@@ -30,8 +30,8 @@ import java.util.Set;
@Activate
public class MigrationRuleHandler<T> {
+ public static final String DUBBO_SERVICEDISCOVERY_MIGRATION = "dubbo.application.service-discovery.migration";
private static final Logger logger = LoggerFactory.getLogger(MigrationRuleHandler.class);
- private static final String DUBBO_SERVICEDISCOVERY_MIGRATION = "dubbo.application.service-discovery.migration";
private MigrationClusterInvoker<T> migrationInvoker;
private MigrationStep currentStep;
@@ -119,8 +119,10 @@ public class MigrationRuleHandler<T> {
}
if (step == MigrationStep.APPLICATION_FIRST) {
+ setCurrentStepAndThreshold(step, threshold);
migrationInvoker.refreshServiceDiscoveryInvokerOnMappingCallback(false);
} else if (step == MigrationStep.FORCE_APPLICATION) {
+ setCurrentStepAndThreshold(step, threshold);
migrationInvoker.refreshServiceDiscoveryInvokerOnMappingCallback(true);
}
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
index 399a260..85c7349 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
@@ -67,20 +67,24 @@ public class MigrationRuleListener implements RegistryProtocolListener, Configur
public MigrationRuleListener() {
this.configuration = ApplicationModel.getEnvironment().getDynamicConfiguration().orElse(null);
+
+ String localRawRule = ApplicationModel.getEnvironment().getLocalMigrationRule();
+ String defaultRawRule = StringUtils.isEmpty(localRawRule) ? INIT : localRawRule;
+
if (this.configuration != null) {
logger.info("Listening for migration rules on dataId " + RULE_KEY + ", group " + DUBBO_SERVICEDISCOVERY_MIGRATION);
configuration.addListener(RULE_KEY, DUBBO_SERVICEDISCOVERY_MIGRATION, this);
String rawRule = configuration.getConfig(RULE_KEY, DUBBO_SERVICEDISCOVERY_MIGRATION);
if (StringUtils.isEmpty(rawRule)) {
- rawRule = INIT;
+ rawRule = defaultRawRule;
}
this.rawRule = rawRule;
} else {
if (logger.isWarnEnabled()) {
logger.warn("Using default configuration rule because config center is not configured!");
}
- rawRule = INIT;
+ rawRule = defaultRawRule;
}
// process(new ConfigChangedEvent(RULE_KEY, DUBBO_SERVICEDISCOVERY_MIGRATION, rawRule));
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
index 8b29483..c7b0daa 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
@@ -251,20 +251,18 @@ public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implement
this.invokersChangedListener = listener;
if (invokersChangedListener != null && invokersChanged) {
invokersChangedListener.onChange();
- invokersChanged = false;
}
}
protected synchronized void invokersChanged() {
invokersChanged = true;
- if (invokersChangedListener != null && invokersChanged) {
+ if (invokersChangedListener != null) {
invokersChangedListener.onChange();
- invokersChanged = false;
}
}
- public synchronized void markInvokersChanged() {
- this.invokersChanged = true;
+ public boolean isNotificationReceived() {
+ return invokersChanged;
}
protected abstract void destroyAllInvokers();
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java
index 85b63de..e2909ce 100644
--- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java
@@ -19,7 +19,6 @@ package org.apache.dubbo.registry.client;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import static java.lang.String.valueOf;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.METADATA_SERVICE_URLS_PROPERTY_NAME;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -37,7 +36,7 @@ public class DefaultServiceInstanceTest {
public DefaultServiceInstance instance;
public static DefaultServiceInstance createInstance() {
- DefaultServiceInstance instance = new DefaultServiceInstance(valueOf(System.nanoTime()), "A", "127.0.0.1", 8080);
+ DefaultServiceInstance instance = new DefaultServiceInstance("A", "127.0.0.1", 8080);
instance.getMetadata().put("dubbo.metadata-service.urls", "[ \"dubbo://192.168.0.102:20881/com.alibaba.cloud.dubbo.service.DubboMetadataService?anyhost=true&application=spring-cloud-alibaba-dubbo-provider&bind.ip=192.168.0.102&bind.port=20881&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=spring-cloud-alibaba-dubbo-provider&interface=com.alibaba.cloud.dubbo.service.DubboMetadataService&methods=getAllServiceKeys,getServiceRestMetadata,getExportedURLs,getAllExportedU [...]
instance.getMetadata().put("dubbo.metadata-service.url-params", "{\"dubbo\":{\"application\":\"dubbo-provider-demo\",\"deprecated\":\"false\",\"group\":\"dubbo-provider-demo\",\"version\":\"1.0.0\",\"timestamp\":\"1564845042651\",\"dubbo\":\"2.0.2\",\"provider.host\":\"192.168.0.102\",\"provider.port\":\"20880\"}}");
return instance;
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java
index 8677381..65527df 100644
--- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URLBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import static org.apache.dubbo.registry.client.DefaultServiceInstanceTest.createInstance;
@@ -29,6 +30,7 @@ import static org.apache.dubbo.registry.client.DefaultServiceInstanceTest.create
*
* @since 2.7.5
*/
+@Disabled("FileSystemServiceDiscovery implementation is not stable enough at present")
public class FileSystemServiceDiscoveryTest {
private FileSystemServiceDiscovery serviceDiscovery;
diff --git a/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleServiceDiscovery.java b/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleServiceDiscovery.java
index 084376b..15bf2d1 100644
--- a/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleServiceDiscovery.java
@@ -20,7 +20,6 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.utils.DefaultPage;
import org.apache.dubbo.common.utils.Page;
-import org.apache.dubbo.event.ConditionalEventListener;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceDiscoveryFactory;
import org.apache.dubbo.registry.client.ServiceInstance;
@@ -33,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
public class MultipleServiceDiscovery implements ServiceDiscovery {
public static final String REGISTRY_PREFIX_KEY = "child.";
@@ -88,16 +88,23 @@ public class MultipleServiceDiscovery implements ServiceDiscovery {
@Override
public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException {
- MultiServiceInstancesChangedListener multiListener = new MultiServiceInstancesChangedListener(listener);
+ MultiServiceInstancesChangedListener multiListener = (MultiServiceInstancesChangedListener) listener;
for (String registryKey : serviceDiscoveries.keySet()) {
- SingleServiceInstancesChangedListener singleListener = new SingleServiceInstancesChangedListener(listener.getServiceNames(), serviceDiscoveries.get(registryKey), multiListener);
- multiListener.putSingleListener(registryKey, singleListener);
- serviceDiscoveries.get(registryKey).addServiceInstancesChangedListener(singleListener);
+ ServiceDiscovery serviceDiscovery = serviceDiscoveries.get(registryKey);
+ SingleServiceInstancesChangedListener singleListener = multiListener.getAndComputeIfAbsent(registryKey, k -> {
+ return new SingleServiceInstancesChangedListener(listener.getServiceNames(), serviceDiscovery, multiListener);
+ });
+ serviceDiscovery.addServiceInstancesChangedListener(singleListener);
}
}
@Override
+ public ServiceInstancesChangedListener createListener(Set<String> serviceNames) {
+ return new MultiServiceInstancesChangedListener(serviceNames, this);
+ }
+
+ @Override
public Page<ServiceInstance> getInstances(String serviceName, int offset, int pageSize, boolean healthyOnly) throws NullPointerException, IllegalArgumentException, UnsupportedOperationException {
List<ServiceInstance> serviceInstanceList = new ArrayList<>();
@@ -123,17 +130,12 @@ public class MultipleServiceDiscovery implements ServiceDiscovery {
return serviceInstance;
}
- protected static class MultiServiceInstancesChangedListener implements ConditionalEventListener<ServiceInstancesChangedEvent> {
- private final ServiceInstancesChangedListener sourceListener;
- private final Map<String, SingleServiceInstancesChangedListener> singleListenerMap = new ConcurrentHashMap<>();
-
- public MultiServiceInstancesChangedListener(ServiceInstancesChangedListener sourceListener) {
- this.sourceListener = sourceListener;
- }
+ protected static class MultiServiceInstancesChangedListener extends ServiceInstancesChangedListener {
+ private final Map<String, SingleServiceInstancesChangedListener> singleListenerMap;
- @Override
- public boolean accept(ServiceInstancesChangedEvent event) {
- return sourceListener.getServiceNames().contains(event.getServiceName());
+ public MultiServiceInstancesChangedListener(Set<String> serviceNames, ServiceDiscovery serviceDiscovery) {
+ super(serviceNames, serviceDiscovery);
+ this.singleListenerMap = new ConcurrentHashMap<>();
}
@Override
@@ -149,12 +151,16 @@ public class MultipleServiceDiscovery implements ServiceDiscovery {
}
}
- sourceListener.onEvent(new ServiceInstancesChangedEvent(event.getServiceName(), serviceInstances));
+ super.onEvent(new ServiceInstancesChangedEvent(event.getServiceName(), serviceInstances));
}
public void putSingleListener(String registryKey, SingleServiceInstancesChangedListener singleListener) {
singleListenerMap.put(registryKey, singleListener);
}
+
+ public SingleServiceInstancesChangedListener getAndComputeIfAbsent(String registryKey, Function<String, SingleServiceInstancesChangedListener> func) {
+ return singleListenerMap.computeIfAbsent(registryKey, func);
+ }
}
protected static class SingleServiceInstancesChangedListener extends ServiceInstancesChangedListener {
diff --git a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/util/NacosNamingServiceUtils.java b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/util/NacosNamingServiceUtils.java
index 7542b5e..abc7467 100644
--- a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/util/NacosNamingServiceUtils.java
+++ b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/util/NacosNamingServiceUtils.java
@@ -57,7 +57,6 @@ public class NacosNamingServiceUtils {
*/
public static Instance toInstance(ServiceInstance serviceInstance) {
Instance instance = new Instance();
- instance.setInstanceId(serviceInstance.getId());
instance.setServiceName(serviceInstance.getServiceName());
instance.setIp(serviceInstance.getHost());
instance.setPort(serviceInstance.getPort());
@@ -75,8 +74,7 @@ public class NacosNamingServiceUtils {
* @since 2.7.5
*/
public static ServiceInstance toServiceInstance(Instance instance) {
- DefaultServiceInstance serviceInstance = new DefaultServiceInstance(instance.getInstanceId(),
- NamingUtils.getServiceName(instance.getServiceName()), instance.getIp(), instance.getPort());
+ DefaultServiceInstance serviceInstance = new DefaultServiceInstance(NamingUtils.getServiceName(instance.getServiceName()), instance.getIp(), instance.getPort());
serviceInstance.setMetadata(instance.getMetadata());
serviceInstance.setEnabled(instance.isEnabled());
serviceInstance.setHealthy(instance.isHealthy());
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
index dce7d20..549b803 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
@@ -26,6 +26,7 @@ import org.apache.dubbo.common.utils.Page;
import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.rpc.RpcException;
@@ -40,6 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import static org.apache.dubbo.common.function.ThrowableFunction.execute;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.isInstanceUpdated;
@@ -199,18 +201,28 @@ public class ZookeeperServiceDiscovery extends AbstractServiceDiscovery {
throw new IllegalStateException("registerServiceWatcher create path=" + path + " fail.", e);
}
- CuratorWatcher watcher = watcherCaches.computeIfAbsent(path, key ->
- new ZookeeperServiceDiscoveryChangeWatcher(this, serviceName, listener));
- try {
- curatorFramework.getChildren().usingWatcher(watcher).forPath(path);
- } catch (KeeperException.NoNodeException e) {
- // ignored
- if (logger.isErrorEnabled()) {
- logger.error(e.getMessage());
+ CountDownLatch latch = new CountDownLatch(1);
+ ZookeeperServiceDiscoveryChangeWatcher watcher = watcherCaches.computeIfAbsent(path, key -> {
+ ZookeeperServiceDiscoveryChangeWatcher tmpWatcher = new ZookeeperServiceDiscoveryChangeWatcher(this, serviceName, path, latch);
+ try {
+ curatorFramework.getChildren().usingWatcher(tmpWatcher).forPath(path);
+ } catch (KeeperException.NoNodeException e) {
+ // ignored
+ if (logger.isErrorEnabled()) {
+ logger.error(e.getMessage());
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
}
- } catch (Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
+ return tmpWatcher;
+ });
+ watcher.addListener(listener);
+ listener.onEvent(new ServiceInstancesChangedEvent(serviceName, this.getInstances(serviceName)));
+ latch.countDown();
+ }
+
+ public void reRegisterWatcher(ZookeeperServiceDiscoveryChangeWatcher watcher) throws Exception {
+ curatorFramework.getChildren().usingWatcher(watcher).forPath(watcher.getPath());
}
private String buildServicePath(String serviceName) {
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
index e34d3e4..8cffc3e 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
@@ -16,8 +16,10 @@
*/
package org.apache.dubbo.registry.zookeeper;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.registry.RegistryNotifier;
import org.apache.dubbo.registry.client.ServiceDiscovery;
+import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
@@ -25,6 +27,10 @@ import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
import static org.apache.dubbo.rpc.model.ApplicationModel.getExecutorRepository;
import static org.apache.zookeeper.Watcher.Event.EventType.NodeChildrenChanged;
import static org.apache.zookeeper.Watcher.Event.EventType.NodeDataChanged;
@@ -37,7 +43,7 @@ import static org.apache.zookeeper.Watcher.Event.EventType.NodeDataChanged;
* @since 2.7.5
*/
public class ZookeeperServiceDiscoveryChangeWatcher implements CuratorWatcher {
- private ServiceInstancesChangedListener listener;
+ private Set<ServiceInstancesChangedListener> listeners = new ConcurrentHashSet<>();
private final ZookeeperServiceDiscovery zookeeperServiceDiscovery;
@@ -47,34 +53,52 @@ public class ZookeeperServiceDiscoveryChangeWatcher implements CuratorWatcher {
private final String serviceName;
+ private final String path;
+
+ private CountDownLatch latch;
+
public ZookeeperServiceDiscoveryChangeWatcher(ZookeeperServiceDiscovery zookeeperServiceDiscovery,
String serviceName,
- ServiceInstancesChangedListener listener) {
+ String path,
+ CountDownLatch latch) {
this.zookeeperServiceDiscovery = zookeeperServiceDiscovery;
this.serviceName = serviceName;
- this.listener = listener;
+ this.path = path;
+ this.latch = latch;
this.notifier = new RegistryNotifier(zookeeperServiceDiscovery.getDelay(), getExecutorRepository().getServiceDiscoveryAddressNotificationExecutor()) {
@Override
protected void doNotify(Object rawAddresses) {
- listener.onEvent((ServiceInstancesChangedEvent)rawAddresses);
+ listeners.forEach(listener -> listener.onEvent((ServiceInstancesChangedEvent)rawAddresses));
}
};
}
@Override
public void process(WatchedEvent event) throws Exception {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ }
Watcher.Event.EventType eventType = event.getType();
if (NodeChildrenChanged.equals(eventType) || NodeDataChanged.equals(eventType)) {
if (shouldKeepWatching()) {
- notifier.notify(new ServiceInstancesChangedEvent(serviceName, zookeeperServiceDiscovery.getInstances(serviceName)));
- zookeeperServiceDiscovery.registerServiceWatcher(serviceName, listener);
- zookeeperServiceDiscovery.dispatchServiceInstancesChangedEvent(serviceName);
+ zookeeperServiceDiscovery.reRegisterWatcher(this);
+ List<ServiceInstance> instanceList = zookeeperServiceDiscovery.getInstances(serviceName);
+ notifier.notify(new ServiceInstancesChangedEvent(serviceName, instanceList));
}
}
}
+ public String getPath() {
+ return path;
+ }
+
+ public void addListener(ServiceInstancesChangedListener listener) {
+ listeners.add(listener);
+ }
+
public boolean shouldKeepWatching() {
return keepWatching;
}
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
index 278aae7..ed9be54 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
@@ -85,7 +85,7 @@ public abstract class CuratorFrameworkUtils {
String host = instance.getAddress();
int port = instance.getPort();
ZookeeperInstance zookeeperInstance = instance.getPayload();
- DefaultServiceInstance serviceInstance = new DefaultServiceInstance(instance.getId(), name, host, port);
+ DefaultServiceInstance serviceInstance = new DefaultServiceInstance(name, host, port);
serviceInstance.setMetadata(zookeeperInstance.getMetadata());
return serviceInstance;
}
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java b/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java
index 5f3b680..359d62a 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java
@@ -34,7 +34,6 @@ import java.util.Map;
import static java.util.Arrays.asList;
import static org.apache.dubbo.common.utils.NetUtils.getAvailablePort;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.INSTANCE_REVISION_UPDATED_KEY;
-import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.generateId;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -104,7 +103,7 @@ public class ZookeeperServiceDiscoveryTest {
}
private DefaultServiceInstance createServiceInstance(String serviceName, String host, int port) {
- return new DefaultServiceInstance(generateId(host, port), serviceName, host, port);
+ return new DefaultServiceInstance(serviceName, host, port);
}
// @Test
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java
new file mode 100644
index 0000000..d0b7848
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+public interface BaseFilter {
+ /**
+ * Make sure call invoker.invoke() in your implementation.
+ */
+ Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
+
+ interface Listener {
+
+ void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
+
+ void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java
index 58e8215..74acb51 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java
@@ -33,6 +33,32 @@ import org.apache.dubbo.common.extension.SPI;
* Caching is implemented in dubbo using filter approach. If cache is configured for invocation then before
* remote call configured caching type's (e.g. Thread Local, JCache etc) implementation invoke method gets called.
* </pre>
+ *
+ * Start from 3.0, the semantics of the Filter component at the consumer side has changed.
+ * Instead of intercepting a specific instance of invoker, Filter in 3.0 now intercepts ClusterInvoker. A new SPI named
+ * InstanceFilter is introduced to work as the same semantic as Filter in 2.x.
+ *
+ * The difference of Filter is as follows:
+ *
+ * 3.x Filter
+ *
+ * -> InstanceFilter -> Invoker
+ *
+ * Proxy -> Filter -> Filter -> ClusterInvoker -> InstanceFilter -> Invoker
+ *
+ * -> InstanceFilter -> Invoker
+ *
+ *
+ * 2.x Filter
+ *
+ * Filter -> Filter -> Invoker
+ *
+ * Proxy -> ClusterInvoker -> Filter -> Filter -> Invoker
+ *
+ * Filter -> Filter -> Invoker
+ *
+ * If you want to a Filter
+ *
* Filter. (SPI, Singleton, ThreadSafe)
*
* @see org.apache.dubbo.rpc.filter.GenericFilter
@@ -41,17 +67,5 @@ import org.apache.dubbo.common.extension.SPI;
* @see org.apache.dubbo.rpc.filter.TpsLimitFilter
*/
@SPI
-public interface Filter {
- /**
- * Make sure call invoker.invoke() in your implementation.
- */
- Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
-
- interface Listener {
-
- void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
-
- void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
- }
-
+public interface Filter extends BaseFilter {
}
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index d7e8c44..33a9b64 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -52,7 +52,7 @@ import java.util.concurrent.TimeUnit;
*/
public abstract class AbstractInvoker<T> implements Invoker<T> {
- protected final Logger logger = LoggerFactory.getLogger(getClass());
+ protected static final Logger logger = LoggerFactory.getLogger(AbstractInvoker.class);
private final Class<T> type;
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
index 5255f55..7c526c2 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
@@ -3,10 +3,8 @@ generic=org.apache.dubbo.rpc.filter.GenericFilter
genericimpl=org.apache.dubbo.rpc.filter.GenericImplFilter
token=org.apache.dubbo.rpc.filter.TokenFilter
accesslog=org.apache.dubbo.rpc.filter.AccessLogFilter
-activelimit=org.apache.dubbo.rpc.filter.ActiveLimitFilter
classloader=org.apache.dubbo.rpc.filter.ClassLoaderFilter
context=org.apache.dubbo.rpc.filter.ContextFilter
-consumercontext=org.apache.dubbo.rpc.filter.ConsumerContextFilter
exception=org.apache.dubbo.rpc.filter.ExceptionFilter
executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/FutureFilter.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/FutureFilter.java
index ee922af..ad95481 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/FutureFilter.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/FutureFilter.java
@@ -20,11 +20,11 @@ import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.filter.ClusterFilter;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.AsyncMethodInfo;
import org.apache.dubbo.rpc.model.ConsumerModel;
@@ -39,7 +39,7 @@ import static org.apache.dubbo.rpc.protocol.dubbo.Constants.ASYNC_METHOD_INFO;
* EventFilter
*/
@Activate(group = CommonConstants.CONSUMER)
-public class FutureFilter implements Filter, Filter.Listener {
+public class FutureFilter implements ClusterFilter, ClusterFilter.Listener {
protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
index 79a7a38..ee41594 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
@@ -1,2 +1 @@
-trace=org.apache.dubbo.rpc.protocol.dubbo.filter.TraceFilter
-future=org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter
\ No newline at end of file
+trace=org.apache.dubbo.rpc.protocol.dubbo.filter.TraceFilter
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter
new file mode 100644
index 0000000..4783214
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter
@@ -0,0 +1 @@
+future=org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/FutureFilterTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/FutureFilterTest.java
index 1a1b0fd..d34f49a 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/FutureFilterTest.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/FutureFilterTest.java
@@ -18,11 +18,11 @@ package org.apache.dubbo.rpc.protocol.dubbo;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.AppResponse;
-import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.cluster.filter.ClusterFilter;
import org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter;
import org.apache.dubbo.rpc.protocol.dubbo.support.DemoService;
@@ -40,7 +40,7 @@ import static org.mockito.Mockito.mock;
*/
public class FutureFilterTest {
private static RpcInvocation invocation;
- private Filter eventFilter = new FutureFilter();
+ private ClusterFilter eventFilter = new FutureFilter();
@BeforeAll
public static void setUp() {