You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by he...@apache.org on 2020/05/08 09:39:51 UTC
[dubbo] branch master updated: Revert "Reduce memory allocation
during address change notification. (#5613)" (#6119)
This is an automated email from the ASF dual-hosted git repository.
hengyunabc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new 9b212c2 Revert "Reduce memory allocation during address change notification. (#5613)" (#6119)
9b212c2 is described below
commit 9b212c2d38dc400edc13f321e3bc6314640c727b
Author: hengyunabc <he...@gmail.com>
AuthorDate: Fri May 8 17:39:43 2020 +0800
Revert "Reduce memory allocation during address change notification. (#5613)" (#6119)
This reverts commit 6491ed01b7ba7cdcf2c26934ca806df3a89cf2a9.
---
.../dubbo/rpc/cluster/support/ClusterUtils.java | 84 +++++--
.../src/main/java/org/apache/dubbo/common/URL.java | 263 +++++++++++++++------
.../java/org/apache/dubbo/common/URLBuilder.java | 130 ++++++++--
.../java/org/apache/dubbo/common/URLStrParser.java | 51 +---
.../dubbo/common/constants/CommonConstants.java | 4 -
.../dubbo/common/extension/ExtensionLoader.java | 16 +-
.../manager/DefaultExecutorRepository.java | 8 -
.../threadpool/manager/ExecutorRepository.java | 7 -
.../org/apache/dubbo/common/utils/UrlUtils.java | 12 -
.../org/apache/dubbo/config/RegistryConfig.java | 12 -
.../org/apache/dubbo/common/URLStrParserTest.java | 13 +-
.../test/java/org/apache/dubbo/common/URLTest.java | 20 +-
.../support/AbortPolicyWithReportTest.java | 4 +-
.../org/apache/dubbo/config/ServiceConfig.java | 9 +-
.../src/main/resources/META-INF/compat/dubbo.xsd | 5 -
.../src/main/resources/META-INF/dubbo.xsd | 5 -
.../dubbo/monitor/dubbo/DubboMonitorFactory.java | 10 +-
.../java/org/apache/dubbo/registry/Registry.java | 6 -
.../apache/dubbo/registry/RegistryNotifier.java | 78 ------
.../registry/integration/RegistryDirectory.java | 104 +++-----
.../registry/integration/RegistryProtocol.java | 110 ++++-----
.../dubbo/registry/support/AbstractRegistry.java | 7 +-
.../support/CacheableFailbackRegistry.java | 87 -------
.../dubbo/registry/dubbo/DubboRegistryFactory.java | 6 +-
.../registry/dubbo/RegistryDirectoryTest.java | 2 +-
.../apache/dubbo/registry/etcd/EtcdRegistry.java | 48 ++--
.../apache/dubbo/registry/nacos/NacosRegistry.java | 53 ++---
.../apache/dubbo/registry/redis/RedisRegistry.java | 32 ++-
.../apache/dubbo/registry/sofa/SofaRegistry.java | 61 ++---
.../registry/zookeeper/ZookeeperRegistry.java | 98 +++-----
.../dubbo/remoting/etcd/jetcd/JEtcdClient.java | 2 +-
31 files changed, 614 insertions(+), 733 deletions(-)
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java
index 2634461..3e5f1d2 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java
@@ -17,22 +17,29 @@
package org.apache.dubbo.rpc.cluster.support;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.Constants;
+import java.util.HashMap;
import java.util.Map;
import static org.apache.dubbo.common.constants.CommonConstants.ALIVE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.CORE_THREADS_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY_PREFIX;
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INVOKER_LISTENER_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.QUEUES_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.RELEASE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_APPLICATION_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TAG_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;
-import static org.apache.dubbo.common.utils.CollectionUtils.isNotEmptyMap;
-import static org.apache.dubbo.common.utils.StringUtils.isNotEmpty;
-import static org.apache.dubbo.remoting.Constants.TRANSPORTER_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
/**
* ClusterUtils
@@ -43,40 +50,71 @@ public class ClusterUtils {
}
public static URL mergeUrl(URL remoteUrl, Map<String, String> localMap) {
+ Map<String, String> map = new HashMap<String, String>();
Map<String, String> remoteMap = remoteUrl.getParameters();
- if (remoteMap == null || remoteMap.size() <= 0) {
- return remoteUrl.addParameters(localMap);
+ if (remoteMap != null && remoteMap.size() > 0) {
+ map.putAll(remoteMap);
+
+ // Remove configurations from provider, some items should be affected by provider.
+ map.remove(THREAD_NAME_KEY);
+ map.remove(DEFAULT_KEY_PREFIX + THREAD_NAME_KEY);
+
+ map.remove(THREADPOOL_KEY);
+ map.remove(DEFAULT_KEY_PREFIX + THREADPOOL_KEY);
+
+ map.remove(CORE_THREADS_KEY);
+ map.remove(DEFAULT_KEY_PREFIX + CORE_THREADS_KEY);
+
+ map.remove(THREADS_KEY);
+ map.remove(DEFAULT_KEY_PREFIX + THREADS_KEY);
+
+ map.remove(QUEUES_KEY);
+ map.remove(DEFAULT_KEY_PREFIX + QUEUES_KEY);
+
+ map.remove(ALIVE_KEY);
+ map.remove(DEFAULT_KEY_PREFIX + ALIVE_KEY);
+
+ map.remove(Constants.TRANSPORTER_KEY);
+ map.remove(DEFAULT_KEY_PREFIX + Constants.TRANSPORTER_KEY);
}
- // Remove configurations from provider, some items should not being affected by provider.
- remoteMap.remove(THREAD_NAME_KEY);
- remoteMap.remove(THREADPOOL_KEY);
- remoteMap.remove(CORE_THREADS_KEY);
- remoteMap.remove(THREADS_KEY);
- remoteMap.remove(QUEUES_KEY);
- remoteMap.remove(ALIVE_KEY);
- remoteMap.remove(TRANSPORTER_KEY);
+ if (localMap != null && localMap.size() > 0) {
+ Map<String, String> copyOfLocalMap = new HashMap<>(localMap);
+
+ if(map.containsKey(GROUP_KEY)){
+ copyOfLocalMap.remove(GROUP_KEY);
+ }
+ if(map.containsKey(VERSION_KEY)){
+ copyOfLocalMap.remove(VERSION_KEY);
+ }
+
+ copyOfLocalMap.remove(RELEASE_KEY);
+ copyOfLocalMap.remove(DUBBO_VERSION_KEY);
+ copyOfLocalMap.remove(METHODS_KEY);
+ copyOfLocalMap.remove(TIMESTAMP_KEY);
+ copyOfLocalMap.remove(TAG_KEY);
- remoteMap.put(REMOTE_APPLICATION_KEY, remoteMap.get(APPLICATION_KEY));
+ map.putAll(copyOfLocalMap);
- if (isNotEmptyMap(localMap)) {
- remoteMap.putAll(localMap);
+ map.put(REMOTE_APPLICATION_KEY, remoteMap.get(APPLICATION_KEY));
// Combine filters and listeners on Provider and Consumer
String remoteFilter = remoteMap.get(REFERENCE_FILTER_KEY);
- String localFilter = localMap.get(REFERENCE_FILTER_KEY);
- if (isNotEmpty(remoteFilter) && isNotEmpty(localFilter)) {
- remoteMap.put(REFERENCE_FILTER_KEY, remoteFilter + "," + localFilter);
+ String localFilter = copyOfLocalMap.get(REFERENCE_FILTER_KEY);
+ if (remoteFilter != null && remoteFilter.length() > 0
+ && localFilter != null && localFilter.length() > 0) {
+ map.put(REFERENCE_FILTER_KEY, remoteFilter + "," + localFilter);
}
String remoteListener = remoteMap.get(INVOKER_LISTENER_KEY);
- String localListener = localMap.get(INVOKER_LISTENER_KEY);
- if (isNotEmpty(remoteListener) && isNotEmpty(localListener)) {
- remoteMap.put(INVOKER_LISTENER_KEY, remoteListener + "," + localListener);
+ String localListener = copyOfLocalMap.get(INVOKER_LISTENER_KEY);
+ if (remoteListener != null && remoteListener.length() > 0
+ && localListener != null && localListener.length() > 0) {
+ map.put(INVOKER_LISTENER_KEY, remoteListener + "," + localListener);
}
}
- return remoteUrl;
+ return remoteUrl.clearParameters().addParameters(map);
}
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
index 22b5b66..e605643 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
@@ -33,6 +33,7 @@ import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -42,6 +43,7 @@ import java.util.concurrent.ConcurrentHashMap;
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY_PREFIX;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.HOST_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
@@ -91,25 +93,25 @@ class URL implements Serializable {
private static final long serialVersionUID = -1985165475234910535L;
- protected String protocol;
+ private final String protocol;
- protected String username;
+ private final String username;
- protected String password;
+ private final String password;
// by default, host to registry
- protected String host;
+ private final String host;
// by default, port to registry
- protected int port;
+ private final int port;
- protected String path;
+ private final String path;
- protected Map<String, String> parameters;
+ private final Map<String, String> parameters;
- // ==== cache ====
+ private final Map<String, Map<String, String>> methodParameters;
- private volatile transient Map<String, Map<String, String>> methodParameters;
+ // ==== cache ====
private volatile transient Map<String, Number> numbers;
@@ -140,6 +142,7 @@ class URL implements Serializable {
this.address = null;
this.path = null;
this.parameters = null;
+ this.methodParameters = null;
}
public URL(String protocol, String host, int port) {
@@ -181,6 +184,17 @@ class URL implements Serializable {
int port,
String path,
Map<String, String> parameters) {
+ this(protocol, username, password, host, port, path, parameters, toMethodParameters(parameters));
+ }
+
+ public URL(String protocol,
+ String username,
+ String password,
+ String host,
+ int port,
+ String path,
+ Map<String, String> parameters,
+ Map<String, Map<String, String>> methodParameters) {
if (StringUtils.isEmpty(username)
&& StringUtils.isNotEmpty(password)) {
throw new IllegalArgumentException("Invalid url, password without username!");
@@ -190,6 +204,7 @@ class URL implements Serializable {
this.password = password;
this.host = host;
this.port = Math.max(port, 0);
+ this.address = getAddress(this.host, this.port);
// trim the beginning "/"
while (path != null && path.startsWith("/")) {
@@ -197,10 +212,12 @@ class URL implements Serializable {
}
this.path = path;
if (parameters == null) {
- this.parameters = new HashMap<>();
+ parameters = new HashMap<>();
} else {
- this.parameters = new HashMap<>(parameters);
+ parameters = new HashMap<>(parameters);
}
+ this.parameters = Collections.unmodifiableMap(parameters);
+ this.methodParameters = Collections.unmodifiableMap(methodParameters);
}
private static String getAddress(String host, int port) {
@@ -208,29 +225,135 @@ class URL implements Serializable {
}
/**
- * parse decoded url string, formatted dubbo://host:port/path?param=value, into strutted URL.
+ * NOTICE: This method allocate too much objects, we can use {@link URLStrParser#parseDecodedStr(String)} instead.
+ * <p>
+ * Parse url string
*
- * @param url, decoded url string
- * @return
+ * @param url URL string
+ * @return URL instance
+ * @see URL
*/
public static URL valueOf(String url) {
- return valueOf(url, false);
+ if (url == null || (url = url.trim()).length() == 0) {
+ throw new IllegalArgumentException("url == null");
+ }
+ String protocol = null;
+ String username = null;
+ String password = null;
+ String host = null;
+ int port = 0;
+ String path = null;
+ Map<String, String> parameters = null;
+ int i = url.indexOf('?'); // separator between body and parameters
+ if (i >= 0) {
+ String[] parts = url.substring(i + 1).split("&");
+ parameters = new HashMap<>();
+ for (String part : parts) {
+ part = part.trim();
+ if (part.length() > 0) {
+ int j = part.indexOf('=');
+ if (j >= 0) {
+ String key = part.substring(0, j);
+ String value = part.substring(j + 1);
+ parameters.put(key, value);
+ // compatible with lower versions registering "default." keys
+ if (key.startsWith(DEFAULT_KEY_PREFIX)) {
+ parameters.putIfAbsent(key.substring(DEFAULT_KEY_PREFIX.length()), value);
+ }
+ } else {
+ parameters.put(part, part);
+ }
+ }
+ }
+ url = url.substring(0, i);
+ }
+ i = url.indexOf("://");
+ if (i >= 0) {
+ if (i == 0) {
+ throw new IllegalStateException("url missing protocol: \"" + url + "\"");
+ }
+ protocol = url.substring(0, i);
+ url = url.substring(i + 3);
+ } else {
+ // case: file:/path/to/file.txt
+ i = url.indexOf(":/");
+ if (i >= 0) {
+ if (i == 0) {
+ throw new IllegalStateException("url missing protocol: \"" + url + "\"");
+ }
+ protocol = url.substring(0, i);
+ url = url.substring(i + 1);
+ }
+ }
+
+ i = url.indexOf('/');
+ if (i >= 0) {
+ path = url.substring(i + 1);
+ url = url.substring(0, i);
+ }
+ i = url.lastIndexOf('@');
+ if (i >= 0) {
+ username = url.substring(0, i);
+ int j = username.indexOf(':');
+ if (j >= 0) {
+ password = username.substring(j + 1);
+ username = username.substring(0, j);
+ }
+ url = url.substring(i + 1);
+ }
+ i = url.lastIndexOf(':');
+ if (i >= 0 && i < url.length() - 1) {
+ if (url.lastIndexOf('%') > i) {
+ // ipv6 address with scope id
+ // e.g. fe80:0:0:0:894:aeec:f37d:23e1%en0
+ // see https://howdoesinternetwork.com/2013/ipv6-zone-id
+ // ignore
+ } else {
+ port = Integer.parseInt(url.substring(i + 1));
+ url = url.substring(0, i);
+ }
+ }
+ if (url.length() > 0) {
+ host = url;
+ }
+
+ return new URL(protocol, username, password, host, port, path, parameters);
}
- /**
- * parse normal or encoded url string into strutted URL:
- * - dubbo://host:port/path?param=value
- * - URL.encode("dubbo://host:port/path?param=value")
- *
- * @param url, url string
- * @param encoded, encoded or decoded
- * @return
- */
- public static URL valueOf(String url, boolean encoded) {
- if (encoded) {
- return URLStrParser.parseEncodedStr(url, false);
+ public static Map<String, Map<String, String>> toMethodParameters(Map<String, String> parameters) {
+ Map<String, Map<String, String>> methodParameters = new HashMap<>();
+ if (parameters == null) {
+ return methodParameters;
+ }
+
+ String methodsString = parameters.get(METHODS_KEY);
+ if (StringUtils.isNotEmpty(methodsString)) {
+ List<String> methods = StringUtils.splitToList(methodsString, ',');
+ for (Map.Entry<String, String> entry : parameters.entrySet()) {
+ String key = entry.getKey();
+ for (int i = 0; i < methods.size(); i++) {
+ String method = methods.get(i);
+ int methodLen = method.length();
+ if (key.length() > methodLen
+ && key.startsWith(method)
+ && key.charAt(methodLen) == '.') {//equals to: key.startsWith(method + '.')
+ String realKey = key.substring(methodLen + 1);
+ URL.putMethodParameter(method, realKey, entry.getValue(), methodParameters);
+ }
+ }
+ }
+ } else {
+ for (Map.Entry<String, String> entry : parameters.entrySet()) {
+ String key = entry.getKey();
+ int methodSeparator = key.indexOf('.');
+ if (methodSeparator > 0) {
+ String method = key.substring(0, methodSeparator);
+ String realKey = key.substring(methodSeparator + 1);
+ URL.putMethodParameter(method, realKey, entry.getValue(), methodParameters);
+ }
+ }
}
- return URLStrParser.parseDecodedStr(url, false);
+ return methodParameters;
}
public static URL valueOf(String url, String... reserveParams) {
@@ -444,46 +567,6 @@ class URL implements Serializable {
}
public Map<String, Map<String, String>> getMethodParameters() {
- if (methodParameters == null) {
- methodParameters = initMethodParameters(this.parameters);
- }
- return methodParameters;
- }
-
- private void resetMethodParameters() {
- this.methodParameters = null;
- }
-
- private Map<String, Map<String, String>> initMethodParameters(Map<String, String> parameters) {
- Map<String, Map<String, String>> methodParameters = new HashMap<>();
- if (parameters == null) {
- return methodParameters;
- }
-
- String methodsString = parameters.get(METHODS_KEY);
- if (StringUtils.isNotEmpty(methodsString)) {
- String[] methods = methodsString.split(",");
- for (Map.Entry<String, String> entry : parameters.entrySet()) {
- String key = entry.getKey();
- for (String method : methods) {
- String methodPrefix = method + '.';
- if (key.startsWith(methodPrefix)) {
- String realKey = key.substring(methodPrefix.length());
- URL.putMethodParameter(method, realKey, entry.getValue(), methodParameters);
- }
- }
- }
- } else {
- for (Map.Entry<String, String> entry : parameters.entrySet()) {
- String key = entry.getKey();
- int methodSeparator = key.indexOf('.');
- if (methodSeparator > 0) {
- String method = key.substring(0, methodSeparator);
- String realKey = key.substring(methodSeparator + 1);
- URL.putMethodParameter(method, realKey, entry.getValue(), methodParameters);
- }
- }
- }
return methodParameters;
}
@@ -711,7 +794,7 @@ class URL implements Serializable {
}
public String getMethodParameter(String method, String key) {
- Map<String, String> keyMap = getMethodParameters().get(method);
+ Map<String, String> keyMap = methodParameters.get(method);
String value = null;
if (keyMap != null) {
value = keyMap.get(key);
@@ -1009,6 +1092,40 @@ class URL implements Serializable {
return new URL(protocol, username, password, host, port, path, map);
}
+
+ public URL addMethodParameter(String method, String key, String value) {
+ if (StringUtils.isEmpty(method)
+ || StringUtils.isEmpty(key)
+ || StringUtils.isEmpty(value)) {
+ return this;
+ }
+
+ Map<String, String> map = new HashMap<>(getParameters());
+ map.put(method + "." + key, value);
+ Map<String, Map<String, String>> methodMap = toMethodParameters(map);
+ URL.putMethodParameter(method, key, value, methodMap);
+
+ return new URL(protocol, username, password, host, port, path, map, methodMap);
+ }
+
+ public URL addMethodParameterIfAbsent(String method, String key, String value) {
+ if (StringUtils.isEmpty(method)
+ || StringUtils.isEmpty(key)
+ || StringUtils.isEmpty(value)) {
+ return this;
+ }
+ if (hasMethodParameter(method, key)) {
+ return this;
+ }
+
+ Map<String, String> map = new HashMap<>(getParameters());
+ map.put(method + "." + key, value);
+ Map<String, Map<String, String>> methodMap = toMethodParameters(map);
+ URL.putMethodParameter(method, key, value, methodMap);
+
+ return new URL(protocol, username, password, host, port, path, map, methodMap);
+ }
+
/**
* Add parameters to a new url.
*
@@ -1040,11 +1157,9 @@ class URL implements Serializable {
return this;
}
- Map<String, String> srcParams = getParameters();
- Map<String, String> newMap = new HashMap<>((int) ((srcParams.size() + parameters.size()) / 0.75 + 1));
- newMap.putAll(srcParams);
- newMap.putAll(parameters);
- return new URL(protocol, username, password, host, port, path, newMap);
+ Map<String, String> map = new HashMap<>(getParameters());
+ map.putAll(parameters);
+ return new URL(protocol, username, password, host, port, path, map);
}
public URL addParametersIfAbsent(Map<String, String> parameters) {
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java
index aad6e1c..20c6c60 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URLBuilder.java
@@ -24,10 +24,34 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-public final class URLBuilder extends URL {
+public final class URLBuilder {
+ private String protocol;
+
+ private String username;
+
+ private String password;
+
+ // by default, host to registry
+ private String host;
+
+ // by default, port to registry
+ private int port;
+
+ private String path;
+
+ private Map<String, String> parameters;
+
+ private Map<String, Map<String, String>> methodParameters;
public URLBuilder() {
- super();
+ protocol = null;
+ username = null;
+ password = null;
+ host = null;
+ port = 0;
+ path = null;
+ parameters = new HashMap<>();
+ methodParameters = new HashMap<>();
}
public URLBuilder(String protocol, String host, int port) {
@@ -60,7 +84,24 @@ public final class URLBuilder extends URL {
String host,
int port,
String path, Map<String, String> parameters) {
- super(protocol, username, password, host, port, path, parameters);
+ this(protocol, username, password, host, port, path, parameters, URL.toMethodParameters(parameters));
+ }
+
+ public URLBuilder(String protocol,
+ String username,
+ String password,
+ String host,
+ int port,
+ String path, Map<String, String> parameters,
+ Map<String, Map<String, String>> methodParameters) {
+ this.protocol = protocol;
+ this.username = username;
+ this.password = password;
+ this.host = host;
+ this.port = port;
+ this.path = path;
+ this.parameters = parameters != null ? parameters : new HashMap<>();
+ this.methodParameters = (methodParameters != null ? methodParameters : new HashMap<>());
}
public static URLBuilder from(URL url) {
@@ -71,6 +112,7 @@ public final class URLBuilder extends URL {
int port = url.getPort();
String path = url.getPath();
Map<String, String> parameters = new HashMap<>(url.getParameters());
+ Map<String, Map<String, String>> methodParameters = new HashMap<>(url.getMethodParameters());
return new URLBuilder(
protocol,
username,
@@ -78,7 +120,8 @@ public final class URLBuilder extends URL {
host,
port,
path,
- parameters);
+ parameters,
+ methodParameters);
}
public URL build() {
@@ -98,13 +141,14 @@ public final class URLBuilder extends URL {
path = path.substring(firstNonSlash);
}
}
-
- URL url = new URL(protocol, username, password, host, port, path);
- url.parameters = this.parameters;
-
- return url;
+ if (CollectionUtils.isEmptyMap(methodParameters)) {
+ return new URL(protocol, username, password, host, port, path, parameters);
+ } else {
+ return new URL(protocol, username, password, host, port, path, parameters, methodParameters);
+ }
}
+
public URLBuilder setProtocol(String protocol) {
this.protocol = protocol;
return this;
@@ -219,6 +263,14 @@ public final class URLBuilder extends URL {
return this;
}
+ public URLBuilder addMethodParameter(String method, String key, String value) {
+ if (StringUtils.isEmpty(method) || StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) {
+ return this;
+ }
+ URL.putMethodParameter(method, key, value, methodParameters);
+ return this;
+ }
+
public URLBuilder addParameterIfAbsent(String key, String value) {
if (StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) {
return this;
@@ -230,6 +282,17 @@ public final class URLBuilder extends URL {
return this;
}
+ public URLBuilder addMethodParameterIfAbsent(String method, String key, String value) {
+ if (StringUtils.isEmpty(method) || StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) {
+ return this;
+ }
+ if (hasMethodParameter(method, key)) {
+ return this;
+ }
+ URL.putMethodParameter(method, key, value, methodParameters);
+ return this;
+ }
+
public URLBuilder addParameters(Map<String, String> parameters) {
if (CollectionUtils.isEmptyMap(parameters)) {
return this;
@@ -253,6 +316,15 @@ public final class URLBuilder extends URL {
return this;
}
+ public URLBuilder addMethodParameters(Map<String, Map<String, String>> methodParameters) {
+ if (CollectionUtils.isEmptyMap(methodParameters)) {
+ return this;
+ }
+
+ this.methodParameters.putAll(methodParameters);
+ return this;
+ }
+
public URLBuilder addParametersIfAbsent(Map<String, String> parameters) {
if (CollectionUtils.isEmptyMap(parameters)) {
return this;
@@ -317,19 +389,39 @@ public final class URLBuilder extends URL {
return StringUtils.isNotEmpty(value);
}
+ public boolean hasMethodParameter(String method, String key) {
+ if (method == null) {
+ String suffix = "." + key;
+ for (String fullKey : parameters.keySet()) {
+ if (fullKey.endsWith(suffix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ if (key == null) {
+ String prefix = method + ".";
+ for (String fullKey : parameters.keySet()) {
+ if (fullKey.startsWith(prefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ String value = getMethodParameter(method, key);
+ return value != null && value.length() > 0;
+ }
+
public String getParameter(String key) {
return parameters.get(key);
}
- /**
- * Parse url string
- *
- * @param url URL string
- * @return URL instance
- * @see URL
- */
- public static URLBuilder valueOf(String url) {
- return (URLBuilder) URLStrParser.parseDecodedStr(url, true);
+ public String getMethodParameter(String method, String key) {
+ Map<String, String> keyMap = methodParameters.get(method);
+ String value = null;
+ if (keyMap != null) {
+ value = keyMap.get(key);
+ }
+ return value;
}
-
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java
index ccf3edf..37afb78 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java
@@ -20,7 +20,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY_PREFIX;
import static org.apache.dubbo.common.utils.StringUtils.EMPTY_STRING;
import static org.apache.dubbo.common.utils.StringUtils.decodeHexByte;
import static org.apache.dubbo.common.utils.Utf8Utils.decodeUtf8;
@@ -35,16 +34,12 @@ public final class URLStrParser {
//empty
}
- public static URL parseDecodedStr(String decodedURLStr) {
- return parseDecodedStr(decodedURLStr, false);
- }
-
/**
* @param decodedURLStr : after {@link URL#decode} string
* decodedURLStr format: protocol://username:password@host:port/path?k1=v1&k2=v2
* [protocol://][username:password@][host:port]/[path][?k1=v1&k2=v2]
*/
- public static URL parseDecodedStr(String decodedURLStr, boolean modifiable) {
+ public static URL parseDecodedStr(String decodedURLStr) {
Map<String, String> parameters = null;
int pathEndIdx = decodedURLStr.indexOf('?');
if (pathEndIdx >= 0) {
@@ -54,7 +49,7 @@ public final class URLStrParser {
}
String decodedBody = decodedURLStr.substring(0, pathEndIdx);
- return parseURLBody(decodedURLStr, decodedBody, parameters, modifiable);
+ return parseURLBody(decodedURLStr, decodedBody, parameters);
}
private static Map<String, String> parseDecodedParams(String str, int from) {
@@ -97,7 +92,7 @@ public final class URLStrParser {
* @param parameters :
* @return URL
*/
- private static URL parseURLBody(String fullURLStr, String decodedBody, Map<String, String> parameters, boolean modifiable) {
+ private static URL parseURLBody(String fullURLStr, String decodedBody, Map<String, String> parameters) {
int starIdx = 0, endIdx = decodedBody.length();
String protocol = null;
int protoEndIdx = decodedBody.indexOf("://");
@@ -154,25 +149,15 @@ public final class URLStrParser {
if (endIdx > starIdx) {
host = decodedBody.substring(starIdx, endIdx);
}
-
- if (modifiable) {
- return new URLBuilder(protocol, username, password, host, port, path, parameters);
- } else {
- return new URL(protocol, username, password, host, port, path, parameters);
- }
- }
-
- public static URL parseEncodedStr(String encodedURLStr) {
- return parseEncodedStr(encodedURLStr, false);
+ return new URL(protocol, username, password, host, port, path, parameters);
}
-
/**
* @param encodedURLStr : after {@link URL#encode(String)} string
* encodedURLStr after decode format: protocol://username:password@host:port/path?k1=v1&k2=v2
* [protocol://][username:password@][host:port]/[path][?k1=v1&k2=v2]
*/
- public static URL parseEncodedStr(String encodedURLStr, boolean modifiable) {
+ public static URL parseEncodedStr(String encodedURLStr) {
Map<String, String> parameters = null;
int pathEndIdx = encodedURLStr.indexOf("%3F");// '?'
if (pathEndIdx >= 0) {
@@ -183,7 +168,7 @@ public final class URLStrParser {
//decodedBody format: [protocol://][username:password@][host:port]/[path]
String decodedBody = decodeComponent(encodedURLStr, 0, pathEndIdx, false, DECODE_TEMP_BUF.get());
- return parseURLBody(encodedURLStr, decodedBody, parameters, modifiable);
+ return parseURLBody(encodedURLStr, decodedBody, parameters);
}
private static Map<String, String> parseEncodedParams(String str, int from) {
@@ -240,30 +225,12 @@ public final class URLStrParser {
if (isEncoded) {
String name = decodeComponent(str, nameStart, valueStart - 3, false, tempBuf);
- String value;
- if (valueStart == valueEnd) {
- value = name;
- } else {
- value = decodeComponent(str, valueStart, valueEnd, false, tempBuf);
- }
+ String value = decodeComponent(str, valueStart, valueEnd, false, tempBuf);
params.put(name, value);
- // compatible with lower versions registering "default." keys
- if (name.startsWith(DEFAULT_KEY_PREFIX)) {
- params.putIfAbsent(name.substring(DEFAULT_KEY_PREFIX.length()), value);
- }
} else {
- String name = str.substring(nameStart, valueStart - 1);
- String value;
- if (valueStart == valueEnd) {
- value = name;
- } else {
- value = str.substring(valueStart, valueEnd);
- }
+ String name = str.substring(nameStart, valueStart -1);
+ String value = str.substring(valueStart, valueEnd);
params.put(name, value);
- // compatible with lower versions registering "default." keys
- if (name.startsWith(DEFAULT_KEY_PREFIX)) {
- params.putIfAbsent(name.substring(DEFAULT_KEY_PREFIX.length()), value);
- }
}
return true;
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index defa559..27c5744 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -310,8 +310,4 @@ public interface CommonConstants {
String SSL_ENABLED_KEY = "ssl-enabled";
- String REGISTRY_SNAPSHOT_KEY = "snapshot";
-
- String REGISTRY_DELAY_NOTIFICATION_KEY = "delay-notification";
-
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java b/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java
index bfea952..cf34202 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java
@@ -336,27 +336,15 @@ public class ExtensionLoader<T> {
for (Map.Entry<String, String> entry : url.getParameters().entrySet()) {
String k = entry.getKey();
String v = entry.getValue();
- if (isMatch(key, keyValue, k, v)) {
+ if ((k.equals(key) || k.endsWith("." + key))
+ && ((keyValue != null && keyValue.equals(v)) || (keyValue == null && ConfigUtils.isNotEmpty(v)))) {
return true;
}
}
-
- for (Map.Entry<String, Map<String, String>> entry : url.getMethodParameters().entrySet()) {
- Map<String, String> methodKeyValues = entry.getValue();
- for (Map.Entry<String, String> methodEntry : methodKeyValues.entrySet()) {
- String k = methodEntry.getKey();
- String v = methodEntry.getValue();
- return isMatch(key, keyValue, k, v);
- }
- }
}
return false;
}
- private boolean isMatch(String key, String value, String k, String v) {
- return k.equals(key) && ((value != null && value.equals(v)) || (value == null && ConfigUtils.isNotEmpty(v)));
- }
-
/**
* Get extension's instance. Return <code>null</code> if extension is not found or is not initialized. Pls. note
* that this method will not trigger extension load.
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
index 3df98d5..dd37bff 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
@@ -50,8 +50,6 @@ public class DefaultExecutorRepository implements ExecutorRepository {
private ScheduledExecutorService serviceExporterExecutor;
- public ScheduledExecutorService registryNotificationExecutor;
-
private ScheduledExecutorService reconnectScheduledExecutor;
private ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = new ConcurrentHashMap<>();
@@ -64,7 +62,6 @@ public class DefaultExecutorRepository implements ExecutorRepository {
//
// reconnectScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-reconnect-scheduler"));
serviceExporterExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Dubbo-exporter-scheduler"));
- registryNotificationExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-registry-notification"));
}
/**
@@ -158,11 +155,6 @@ public class DefaultExecutorRepository implements ExecutorRepository {
}
@Override
- public ScheduledExecutorService getRegistryNotificationExecutor() {
- return registryNotificationExecutor;
- }
-
- @Override
public ExecutorService getSharedExecutor() {
return SHARED_EXECUTOR;
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
index dd91442..af3b110 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
@@ -58,13 +58,6 @@ public interface ExecutorRepository {
ScheduledExecutorService getServiceExporterExecutor();
/**
- * Scheduled executor handle registry notification.
- *
- * @return
- */
- ScheduledExecutorService getRegistryNotificationExecutor();
-
- /**
* Get the default shared threadpool.
*
* @return
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java
index 797f09b..d45ada1 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java
@@ -17,7 +17,6 @@
package org.apache.dubbo.common.utils;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.constants.RemotingConstants;
import java.util.ArrayList;
@@ -554,15 +553,4 @@ public class UrlUtils {
arr[1] = serviceKey;
return arr;
}
-
- public static URLBuilder newModifiableUrl(URL url) {
- return URLBuilder.from(url);
- }
-
- public static URL unmodifiableUrl(URL url) {
- if (url instanceof URLBuilder) {
- return ((URLBuilder) url).build();
- }
- return url;
- }
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/RegistryConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/RegistryConfig.java
index 7411205..3b868fe 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/RegistryConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/RegistryConfig.java
@@ -24,7 +24,6 @@ import org.apache.dubbo.config.support.Parameter;
import java.util.Map;
import static org.apache.dubbo.common.constants.CommonConstants.EXTRA_KEYS_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.REGISTRY_DELAY_NOTIFICATION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.SHUTDOWN_WAIT_KEY;
import static org.apache.dubbo.config.Constants.REGISTRIES_SUFFIX;
@@ -181,8 +180,6 @@ public class RegistryConfig extends AbstractConfig {
*/
private Integer weight;
- private Integer lazyNotification;
-
public RegistryConfig() {
}
@@ -500,15 +497,6 @@ public class RegistryConfig extends AbstractConfig {
this.weight = weight;
}
- @Parameter(key = REGISTRY_DELAY_NOTIFICATION_KEY)
- public Integer getLazyNotification() {
- return lazyNotification;
- }
-
- public void setLazyNotification(Integer lazyNotification) {
- this.lazyNotification = lazyNotification;
- }
-
@Override
public void refresh() {
super.refresh();
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/URLStrParserTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/URLStrParserTest.java
index db3c657..3ca62ce 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/URLStrParserTest.java
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/URLStrParserTest.java
@@ -21,12 +21,15 @@ import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
+/**
+ * Created by LinShunkang on 2020/03/12
+ */
public class URLStrParserTest {
@Test
public void test() {
String str = "dubbo%3A%2F%2Fadmin%3Aadmin123%40192.168.1.41%3A28113%2Forg.test.api.DemoService%24Iface%3Fanyhost%3Dtrue%26application%3Ddemo-service%26dubbo%3D2.6.1%26generic%3Dfalse%26interface%3Dorg.test.api.DemoService%24Iface%26methods%3DorbCompare%2CcheckText%2CcheckPicture%26pid%3D65557%26revision%3D1.4.17%26service.filter%3DbootMetrics%26side%3Dprovider%26status%3Dserver%26threads%3D200%26timestamp%3D1583136298859%26version%3D1.0.0";
- System.out.println(URLStrParser.parseEncodedStr(str, false));
+ System.out.println(URLStrParser.parseEncodedStr(str));
String decodeStr = URL.decode(str);
URL originalUrl = URL.valueOf(decodeStr);
@@ -34,12 +37,4 @@ public class URLStrParserTest {
assertThat(URLStrParser.parseDecodedStr(decodeStr), equalTo(originalUrl));
}
- @Test
- public void testNoValue() {
- String str = URL.encode("http://1.2.3.4:8080/path?k0=&k1=v1");
-
- System.out.println(URLStrParser.parseEncodedStr(str, false));
-
- }
-
}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/URLTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/URLTest.java
index 52ee650..db5f57b 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/URLTest.java
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/URLTest.java
@@ -274,7 +274,7 @@ public class URLTest {
assertEquals("1.0.0", url.getParameter("version"));
assertEquals("morgan", url.getParameter("application"));
- url = URL.valueOf("dubbo://admin:hello1234@10.20.130.230:20880/context/path?version=1.0.0&application=morgan&noValue=");
+ url = URL.valueOf("dubbo://admin:hello1234@10.20.130.230:20880/context/path?version=1.0.0&application=morgan&noValue");
assertURLStrDecoder(url);
assertEquals("dubbo", url.getProtocol());
assertEquals("admin", url.getUsername());
@@ -300,7 +300,7 @@ public class URLTest {
@Test
public void test_noValueKey() throws Exception {
- URL url = URL.valueOf("http://1.2.3.4:8080/path?k0=&k1=v1");
+ URL url = URL.valueOf("http://1.2.3.4:8080/path?k0&k1=v1");
assertURLStrDecoder(url);
assertTrue(url.hasParameter("k0"));
@@ -874,20 +874,4 @@ public class URLTest {
url = URL.valueOf("dubbo://10.20.130.230:20880/path");
assertURLStrDecoder(url);
}
-
- @Test
- public void testEquals() {
- URL url1 = URL.valueOf("10.20.130.230:20880/context/path?interface=org.apache.dubbo.test.interfaceName&group=group&version=1.0.0");
- URL url2 = URL.valueOf("10.20.130.230:20880/context/path?interface=org.apache.dubbo.test.interfaceName&group=group&version=1.0.0");
- Assertions.assertEquals(url1, url2);
-
- URL url3 = URL.valueOf("10.20.130.230:20881/context/path?interface=org.apache.dubbo.test.interfaceName&group=group&version=1.0.0");
- Assertions.assertNotEquals(url1, url3);
-
- URL url4 = URL.valueOf("10.20.130.230:20880/context/path?interface=org.apache.dubbo.test.interfaceName&weight=10&group=group&version=1.0.0");
- Assertions.assertNotEquals(url1, url4);
-
- URL url5 = URL.valueOf("10.20.130.230:20880/context/path?interface=org.apache.dubbo.test.interfaceName&weight=10&group=group&version=1.0.0");
- Assertions.assertEquals(url4, url5);
- }
}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/AbortPolicyWithReportTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/AbortPolicyWithReportTest.java
index ea54e9f..ed737ef 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/AbortPolicyWithReportTest.java
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/support/AbortPolicyWithReportTest.java
@@ -17,7 +17,7 @@
package org.apache.dubbo.common.threadpool.support;
import org.apache.dubbo.common.URL;
-
+import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
import org.junit.jupiter.api.Test;
import java.util.concurrent.Executors;
@@ -27,7 +27,7 @@ import java.util.concurrent.ThreadPoolExecutor;
public class AbortPolicyWithReportTest {
@Test
public void jStackDumpTest() throws InterruptedException {
- URL url = URL.valueOf("dubbo://admin:hello1234@10.20.130.230:20880/context/path?dump.directory=/tmp&version=1.0.0&application=morgan&noValue=");
+ URL url = URL.valueOf("dubbo://admin:hello1234@10.20.130.230:20880/context/path?dump.directory=/tmp&version=1.0.0&application=morgan&noValue");
AbortPolicyWithReport abortPolicyWithReport = new AbortPolicyWithReport("Test", url);
try {
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
index a147874..0792d12 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
@@ -32,6 +32,7 @@ import org.apache.dubbo.config.annotation.Service;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.apache.dubbo.config.event.ServiceConfigExportedEvent;
import org.apache.dubbo.config.event.ServiceConfigUnexportedEvent;
+import org.apache.dubbo.config.invoker.DelegateProviderMetaDataInvoker;
import org.apache.dubbo.config.support.Parameter;
import org.apache.dubbo.config.utils.ConfigValidationUtils;
import org.apache.dubbo.event.Event;
@@ -486,7 +487,9 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
- Exporter<?> exporter = PROTOCOL.export(invoker);
+ DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
+
+ Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
@@ -494,7 +497,9 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
- Exporter<?> exporter = PROTOCOL.export(invoker);
+ DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
+
+ Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
/**
diff --git a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
index aa1fe74..1805b2a 100644
--- a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
+++ b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
@@ -623,11 +623,6 @@
<xsd:documentation><![CDATA[ Is this registry the preferred one. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
- <xsd:attribute name="delay-notification" type="xsd:int">
- <xsd:annotation>
- <xsd:documentation><![CDATA[ The maximum time to wait before notification. ]]></xsd:documentation>
- </xsd:annotation>
- </xsd:attribute>
<xsd:attribute name="weight" type="xsd:integer">
<xsd:annotation>
<xsd:documentation><![CDATA[ weight of registry. ]]></xsd:documentation>
diff --git a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
index 3248e9c..bfaaf3f 100644
--- a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
+++ b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
@@ -617,11 +617,6 @@
<xsd:documentation><![CDATA[ Is this registry the preferred one. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
- <xsd:attribute name="delay-notification" type="xsd:int">
- <xsd:annotation>
- <xsd:documentation><![CDATA[ The maximum time to wait before notification. ]]></xsd:documentation>
- </xsd:annotation>
- </xsd:attribute>
<xsd:attribute name="weight" type="xsd:integer">
<xsd:annotation>
<xsd:documentation><![CDATA[ weight of registry. ]]></xsd:documentation>
diff --git a/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/DubboMonitorFactory.java b/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/DubboMonitorFactory.java
index b39199e..3ca881f 100644
--- a/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/DubboMonitorFactory.java
+++ b/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/DubboMonitorFactory.java
@@ -50,12 +50,6 @@ public class DubboMonitorFactory extends AbstractMonitorFactory {
@Override
protected Monitor createMonitor(URL url) {
- Invoker<MonitorService> monitorInvoker = protocol.refer(MonitorService.class, buildMonitorURL(url));
- MonitorService monitorService = proxyFactory.getProxy(monitorInvoker);
- return new DubboMonitor(monitorInvoker, monitorService);
- }
-
- private URL buildMonitorURL(URL url) {
URLBuilder urlBuilder = URLBuilder.from(url);
urlBuilder.setProtocol(url.getParameter(PROTOCOL_KEY, DUBBO_PROTOCOL));
if (StringUtils.isEmpty(url.getPath())) {
@@ -69,7 +63,9 @@ public class DubboMonitorFactory extends AbstractMonitorFactory {
}
urlBuilder.addParameters(CHECK_KEY, String.valueOf(false),
REFERENCE_FILTER_KEY, filter + "-monitor");
- return urlBuilder.build();
+ Invoker<MonitorService> monitorInvoker = protocol.refer(MonitorService.class, urlBuilder.build());
+ MonitorService monitorService = proxyFactory.getProxy(monitorInvoker);
+ return new DubboMonitor(monitorInvoker, monitorService);
}
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java
index 084deff..d5b3dbc 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Registry.java
@@ -19,8 +19,6 @@ package org.apache.dubbo.registry;
import org.apache.dubbo.common.Node;
import org.apache.dubbo.common.URL;
-import static org.apache.dubbo.common.constants.CommonConstants.REGISTRY_DELAY_NOTIFICATION_KEY;
-
/**
* Registry. (SPI, Prototype, ThreadSafe)
*
@@ -28,10 +26,6 @@ import static org.apache.dubbo.common.constants.CommonConstants.REGISTRY_DELAY_N
* @see org.apache.dubbo.registry.support.AbstractRegistry
*/
public interface Registry extends Node, RegistryService {
- default int getDelay() {
- return getUrl().getParameter(REGISTRY_DELAY_NOTIFICATION_KEY, -1);
- }
-
default void reExportRegister(URL url) {
register(url);
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/RegistryNotifier.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/RegistryNotifier.java
deleted file mode 100644
index afb640a..0000000
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/RegistryNotifier.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.registry;
-
-import org.apache.dubbo.common.extension.ExtensionLoader;
-import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public abstract class RegistryNotifier {
-
- private volatile long lastExecuteTime;
- private volatile long lastEventTime;
-
- private Object rawAddresses;
- private Registry registry;
-
- private ScheduledExecutorService scheduler = ExtensionLoader.getExtensionLoader(ExecutorRepository.class)
- .getDefaultExtension().getRegistryNotificationExecutor();
-
- public Registry getRegistry() {
- return registry;
- }
-
- public RegistryNotifier(Registry registry) {
- this.registry = registry;
- }
-
- public void notify(Object rawAddresses) {
- this.rawAddresses = rawAddresses;
- long notifyTime = System.currentTimeMillis();
- this.lastEventTime = notifyTime;
-
- int delayTime = getRegistry().getDelay();
- long delta = (System.currentTimeMillis() - lastExecuteTime) - delayTime;
- if (delta >= 0) {
- scheduler.submit(new NotificationTask(this, notifyTime));
- } else {
- scheduler.schedule(new NotificationTask(this, notifyTime), -delta, TimeUnit.MILLISECONDS);
- }
- }
-
- protected abstract void doNotify(Object rawAddresses);
-
- public class NotificationTask implements Runnable {
- private RegistryNotifier listener;
- private long time;
-
- public NotificationTask(RegistryNotifier listener, long time) {
- this.listener = listener;
- this.time = time;
- }
-
- @Override
- public void run() {
- if (this.time == listener.lastEventTime) {
- listener.doNotify(listener.rawAddresses);
- listener.lastExecuteTime = System.currentTimeMillis();
- }
- }
- }
-
-}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
index 14e26ea..736108f 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
@@ -23,7 +23,6 @@ import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.ArrayUtils;
import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
@@ -64,18 +63,11 @@ import java.util.stream.Collectors;
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.DISABLED_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_PROTOCOL;
-import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.ENABLED_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.PREFERRED_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.RELEASE_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.TAG_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.APP_DYNAMIC_CONFIGURATORS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.COMPATIBLE_CONFIG_KEY;
@@ -85,7 +77,6 @@ import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGO
import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_CONFIGURATORS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
-import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.ROUTE_PROTOCOL;
import static org.apache.dubbo.registry.Constants.CONFIGURATORS_SUFFIX;
@@ -93,6 +84,7 @@ import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY;
import static org.apache.dubbo.registry.integration.RegistryProtocol.DEFAULT_REGISTER_CONSUMER_KEYS;
import static org.apache.dubbo.remoting.Constants.CHECK_KEY;
+import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.ROUTER_KEY;
@@ -111,7 +103,6 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
private final String serviceKey; // Initialization at construction time, assertion not null
private final Class<T> serviceType; // Initialization at construction time, assertion not null
private final Map<String, String> queryMap; // Initialization at construction time, assertion not null
- private final Map<String, String> mergeMap;
private final URL directoryUrl; // Initialization at construction time, assertion not null, and always assign non null value
private final boolean multiGroup;
private Protocol protocol; // Initialization at the time of injection, the assertion is not null
@@ -133,17 +124,17 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
private volatile List<Configurator> configurators; // The initial value is null and the midway may be assigned to null, please use the local variable reference
// Map<url, Invoker> cache service url to invoker mapping.
- private volatile Map<URL, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
+ private volatile Map<String, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
private volatile List<Invoker<T>> invokers;
// Set<invokerUrls> cache invokeUrls to invokers mapping.
- private volatile List<URL> cachedInvokerUrls; // The initial value is null and the midway may be assigned to null, please use the local variable reference
+ private volatile Set<URL> cachedInvokerUrls; // The initial value is null and the midway may be assigned to null, please use the local variable reference
private static final ConsumerConfigurationListener CONSUMER_CONFIGURATION_LISTENER = new ConsumerConfigurationListener();
private ReferenceConfigurationListener serviceConfigurationListener;
- public RegistryDirectory(Class<T> serviceType, URL url, Map<String, String> parameters) {
+ public RegistryDirectory(Class<T> serviceType, URL url) {
super(url);
if (serviceType == null) {
throw new IllegalArgumentException("service type is null.");
@@ -156,37 +147,19 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
}
this.serviceType = serviceType;
this.serviceKey = url.getServiceKey();
- this.queryMap = parameters;
- this.mergeMap = genMergeMap(parameters);
+ this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
this.overrideDirectoryUrl = this.directoryUrl = turnRegistryUrlToConsumerUrl(url);
String group = directoryUrl.getParameter(GROUP_KEY, "");
this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(","));
}
- private Map<String, String> genMergeMap(Map<String, String> parameters) {
- Map<String, String> copyOfParameters = new HashMap<>(parameters);
- copyOfParameters.remove(GROUP_KEY);
- copyOfParameters.remove(VERSION_KEY);
- copyOfParameters.remove(RELEASE_KEY);
- copyOfParameters.remove(DUBBO_VERSION_KEY);
- copyOfParameters.remove(METHODS_KEY);
- copyOfParameters.remove(TIMESTAMP_KEY);
- copyOfParameters.remove(TAG_KEY);
- return copyOfParameters;
- }
-
private URL turnRegistryUrlToConsumerUrl(URL url) {
- // save any parameter in registry that will be useful to the new url.
- URLBuilder builder = URLBuilder.from(url)
+ return URLBuilder.from(url)
.setPath(url.getServiceInterface())
.clearParameters()
.addParameters(queryMap)
- .removeParameter(MONITOR_KEY);
- String isDefault = url.getParameter(PREFERRED_KEY);
- if (StringUtils.isNotEmpty(isDefault)) {
- builder.addParameter(REGISTRY_KEY + "." + PREFERRED_KEY, isDefault);
- }
- return builder.build();
+ .removeParameter(MONITOR_KEY)
+ .build();
}
public void setProtocol(Protocol protocol) {
@@ -310,6 +283,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
*
* @param invokerUrls this parameter can't be null
*/
+ // TODO: 2017/8/31 FIXME The thread pool should be used to refresh the address, otherwise the task may be accumulated.
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");
@@ -322,21 +296,20 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
- Map<URL, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
+ Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls == Collections.<URL>emptyList()) {
invokerUrls = new ArrayList<>();
}
-
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
- invokerUrls = this.cachedInvokerUrls;
+ invokerUrls.addAll(this.cachedInvokerUrls);
} else {
- this.cachedInvokerUrls = invokerUrls;//Cached invoker urls, convenient for comparison
+ this.cachedInvokerUrls = new HashSet<>();
+ this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
-
if (invokerUrls.isEmpty()) {
return;
}
- Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
+ Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
/**
* If the calculation is wrong, it is not processed.
@@ -428,21 +401,18 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
* @param urls
* @return invokers
*/
- private Map<URL, Invoker<T>> toInvokers(List<URL> urls) {
- Map<URL, Invoker<T>> newUrlInvokerMap = new HashMap<>();
+ private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
+ Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
- Set<URL> keys = new HashSet<>();
- String[] acceptProtocols = null;
+ Set<String> keys = new HashSet<>();
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
- if (queryProtocols != null && queryProtocols.length() > 0) {
- acceptProtocols = queryProtocols.split(",");
- }
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
- if (ArrayUtils.isNotEmpty(acceptProtocols)) {
+ if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
+ String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
@@ -463,16 +433,16 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
- URL url = UrlUtils.unmodifiableUrl(mergeUrl(providerUrl));
+ URL url = mergeUrl(providerUrl);
- if (keys.contains(url)) { // Repeated url
+ String key = url.toFullString(); // The parameter urls are sorted
+ if (keys.contains(key)) { // Repeated url
continue;
}
- keys.add(url);
- // Cache key is url that does not merge with consumer side parameters,
- // regardless of how the consumer combines parameters, if the server url changes, then refer again
- Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
- Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(url);
+ keys.add(key);
+ // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
+ Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
+ Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
@@ -488,10 +458,10 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
- newUrlInvokerMap.put(url, invoker);
+ newUrlInvokerMap.put(key, invoker);
}
} else {
- newUrlInvokerMap.put(url, invoker);
+ newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
@@ -505,7 +475,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
* @return
*/
private URL mergeUrl(URL providerUrl) {
- providerUrl = ClusterUtils.mergeUrl(providerUrl, mergeMap); // Merge the consumer side parameters
+ providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // Merge the consumer side parameters
providerUrl = overrideWithConfigurator(providerUrl);
@@ -561,7 +531,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
* Close all invokers
*/
private void destroyAllInvokers() {
- Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
+ Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
if (localUrlInvokerMap != null) {
for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) {
try {
@@ -582,16 +552,16 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
* @param oldUrlInvokerMap
* @param newUrlInvokerMap
*/
- private void destroyUnusedInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, Map<URL, Invoker<T>> newUrlInvokerMap) {
+ private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
destroyAllInvokers();
return;
}
// check deleted invoker
- List<URL> deleted = null;
+ List<String> deleted = null;
if (oldUrlInvokerMap != null) {
Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
- for (Map.Entry<URL, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
+ for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
if (!newInvokers.contains(entry.getValue())) {
if (deleted == null) {
deleted = new ArrayList<>();
@@ -602,7 +572,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
}
if (deleted != null) {
- for (URL url : deleted) {
+ for (String url : deleted) {
if (url != null) {
Invoker<T> invoker = oldUrlInvokerMap.remove(url);
if (invoker != null) {
@@ -679,7 +649,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
if (isDestroyed()) {
return false;
}
- Map<URL, Invoker<T>> localUrlInvokerMap = urlInvokerMap;
+ Map<String, Invoker<T>> localUrlInvokerMap = urlInvokerMap;
if (localUrlInvokerMap != null && localUrlInvokerMap.size() > 0) {
for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) {
if (invoker.isAvailable()) {
@@ -697,7 +667,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
/**
* Haomin: added for test purpose
*/
- public Map<URL, Invoker<T>> getUrlInvokerMap() {
+ public Map<String, Invoker<T>> getUrlInvokerMap() {
return urlInvokerMap;
}
@@ -724,7 +694,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
private void overrideDirectoryUrl() {
// merge override parameters
- this.overrideDirectoryUrl = UrlUtils.newModifiableUrl(directoryUrl);
+ this.overrideDirectoryUrl = directoryUrl;
List<Configurator> localConfigurators = this.configurators; // local reference
doOverrideUrl(localConfigurators);
List<Configurator> localAppDynamicConfigurators = CONSUMER_CONFIGURATION_LISTENER.getConfigurators(); // local reference
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
index c706899..76da303 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
@@ -48,7 +48,7 @@ import org.apache.dubbo.rpc.model.ProviderModel;
import org.apache.dubbo.rpc.protocol.InvokerWrapper;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -67,13 +67,19 @@ import static org.apache.dubbo.common.constants.CommonConstants.HIDE_KEY_PREFIX;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.RELEASE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.common.constants.FilterConstants.VALIDATION_KEY;
+import static org.apache.dubbo.common.constants.QosConstants.ACCEPT_FOREIGN_IP;
+import static org.apache.dubbo.common.constants.QosConstants.QOS_ENABLE;
+import static org.apache.dubbo.common.constants.QosConstants.QOS_HOST;
+import static org.apache.dubbo.common.constants.QosConstants.QOS_PORT;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.CONFIGURATORS_CATEGORY;
-import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.OVERRIDE_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
@@ -89,13 +95,15 @@ import static org.apache.dubbo.registry.Constants.REGISTER_IP_KEY;
import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
import static org.apache.dubbo.registry.Constants.REGISTRY_RETRY_PERIOD_KEY;
import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY;
+import static org.apache.dubbo.remoting.Constants.BIND_IP_KEY;
+import static org.apache.dubbo.remoting.Constants.BIND_PORT_KEY;
import static org.apache.dubbo.remoting.Constants.CHECK_KEY;
import static org.apache.dubbo.remoting.Constants.CODEC_KEY;
import static org.apache.dubbo.remoting.Constants.CONNECTIONS_KEY;
import static org.apache.dubbo.remoting.Constants.EXCHANGER_KEY;
-import static org.apache.dubbo.remoting.Constants.PAYLOAD_KEY;
import static org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY;
import static org.apache.dubbo.rpc.Constants.DEPRECATED_KEY;
+import static org.apache.dubbo.rpc.Constants.INTERFACES;
import static org.apache.dubbo.rpc.Constants.MOCK_KEY;
import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.EXPORT_KEY;
@@ -110,7 +118,7 @@ public class RegistryProtocol implements Protocol {
public static final String[] DEFAULT_REGISTER_PROVIDER_KEYS = {
APPLICATION_KEY, CODEC_KEY, EXCHANGER_KEY, SERIALIZATION_KEY, CLUSTER_KEY, CONNECTIONS_KEY, DEPRECATED_KEY,
GROUP_KEY, LOADBALANCE_KEY, MOCK_KEY, PATH_KEY, TIMEOUT_KEY, TOKEN_KEY, VERSION_KEY, WARMUP_KEY,
- WEIGHT_KEY, DUBBO_VERSION_KEY, RELEASE_KEY, PAYLOAD_KEY
+ WEIGHT_KEY, TIMESTAMP_KEY, DUBBO_VERSION_KEY, RELEASE_KEY
};
public static final String[] DEFAULT_REGISTER_CONSUMER_KEYS = {
@@ -197,12 +205,11 @@ public class RegistryProtocol implements Protocol {
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
- providerUrl = UrlUtils.unmodifiableUrl(providerUrl);
-
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
+ final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
@@ -215,7 +222,6 @@ public class RegistryProtocol implements Protocol {
registerStatedUrl(registryUrl, registeredProviderUrl, register);
// Deprecated! Subscribe to override rules in 2.6.x or before.
- final Registry registry = registryFactory.getRegistry(registryUrl);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
@@ -246,6 +252,7 @@ public class RegistryProtocol implements Protocol {
@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
+
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
@@ -351,12 +358,12 @@ public class RegistryProtocol implements Protocol {
}
protected URL getRegistryUrl(Invoker<?> originInvoker) {
- URLBuilder builder = URLBuilder.from(originInvoker.getUrl());
- if (REGISTRY_PROTOCOL.equals(builder.getProtocol())) {
- String protocol = builder.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);
- builder.setProtocol(protocol).removeParameter(REGISTRY_KEY);
+ URL registryUrl = originInvoker.getUrl();
+ if (REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
+ String protocol = registryUrl.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);
+ registryUrl = registryUrl.setProtocol(protocol).removeParameter(REGISTRY_KEY);
}
- return builder.build();
+ return registryUrl;
}
protected URL getRegistryUrl(URL url) {
@@ -374,31 +381,32 @@ public class RegistryProtocol implements Protocol {
* @return url to registry.
*/
private URL getUrlToRegistry(final URL providerUrl, final URL registryUrl) {
+ //The address you see at the registry
if (!registryUrl.getParameter(SIMPLIFIED_KEY, false)) {
- return providerUrl;
- }
- String extraKeys = registryUrl.getParameter(EXTRA_KEYS_KEY, "");
- // if path is not the same as interface name then we should keep INTERFACE_KEY,
- // otherwise, the registry structure of zookeeper would be '/dubbo/path/providers',
- // but what we expect is '/dubbo/interface/providers'
- if (!providerUrl.getPath().equals(providerUrl.getParameter(INTERFACE_KEY))) {
- if (StringUtils.isNotEmpty(extraKeys)) {
- extraKeys += ",";
+ return providerUrl.removeParameters(getFilteredKeys(providerUrl)).removeParameters(
+ MONITOR_KEY, BIND_IP_KEY, BIND_PORT_KEY, QOS_ENABLE, QOS_HOST, QOS_PORT, ACCEPT_FOREIGN_IP, VALIDATION_KEY,
+ INTERFACES);
+ } else {
+ String extraKeys = registryUrl.getParameter(EXTRA_KEYS_KEY, "");
+ // if path is not the same as interface name then we should keep INTERFACE_KEY,
+ // otherwise, the registry structure of zookeeper would be '/dubbo/path/providers',
+ // but what we expect is '/dubbo/interface/providers'
+ if (!providerUrl.getPath().equals(providerUrl.getParameter(INTERFACE_KEY))) {
+ if (StringUtils.isNotEmpty(extraKeys)) {
+ extraKeys += ",";
+ }
+ extraKeys += INTERFACE_KEY;
}
- extraKeys += INTERFACE_KEY;
+ String[] paramsToRegistry = getParamsToRegistry(DEFAULT_REGISTER_PROVIDER_KEYS
+ , COMMA_SPLIT_PATTERN.split(extraKeys));
+ return URL.valueOf(providerUrl, paramsToRegistry, providerUrl.getParameter(METHODS_KEY, (String[]) null));
}
- String[] extraKeyArrays = COMMA_SPLIT_PATTERN.split(extraKeys);
- String[] paramsToRegistry = getParamsToRegistry(DEFAULT_REGISTER_PROVIDER_KEYS, extraKeyArrays);
- // TODO, avoid creation of URL
- //The address you see at the registry
- return URL.valueOf(providerUrl, paramsToRegistry, providerUrl.getParameter(METHODS_KEY, (String[]) null));
+
}
private URL getSubscribedOverrideUrl(URL registeredProviderUrl) {
- return URLBuilder.from(registeredProviderUrl)
- .setProtocol(PROVIDER_PROTOCOL)
- .addParameters(CATEGORY_KEY, CONFIGURATORS_CATEGORY, CHECK_KEY, String.valueOf(false))
- .build();
+ return registeredProviderUrl.setProtocol(PROVIDER_PROTOCOL)
+ .addParameters(CATEGORY_KEY, CONFIGURATORS_CATEGORY, CHECK_KEY, String.valueOf(false));
}
/**
@@ -412,7 +420,7 @@ public class RegistryProtocol implements Protocol {
if (export == null || export.length() == 0) {
throw new IllegalArgumentException("The registry export url is null! registry: " + originInvoker.getUrl());
}
- return URLBuilder.valueOf(export);
+ return URL.valueOf(export);
}
/**
@@ -422,7 +430,9 @@ public class RegistryProtocol implements Protocol {
* @return
*/
private String getCacheKey(final Invoker<?> originInvoker) {
- return originInvoker.getUrl().getParameterAndDecoded(EXPORT_KEY);
+ URL providerUrl = getProviderUrl(originInvoker);
+ String key = providerUrl.removeParameters("dynamic", "enabled").toFullString();
+ return key;
}
@Override
@@ -435,34 +445,33 @@ public class RegistryProtocol implements Protocol {
}
// group="a,b" or group="*"
- Map<String, String> qs = Collections.unmodifiableMap(StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)));
+ Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
- return doRefer(getMergeableCluster(), registry, type, url, qs);
+ return doRefer(getMergeableCluster(), registry, type, url);
}
}
- return doRefer(cluster, registry, type, url, qs);
+ return doRefer(cluster, registry, type, url);
}
private Cluster getMergeableCluster() {
return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable");
}
- private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {
- RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url, parameters);
+ private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
+ RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
-
- URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.get(REGISTER_IP_KEY), 0, type.getName(), parameters);
- subscribeUrl = toSubscribeUrl(subscribeUrl);
-
+ // all attributes of REFER_KEY
+ Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
+ URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
- directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
+ directory.setRegisteredConsumerUrl(subscribeUrl);
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
- directory.subscribe(subscribeUrl);
+ directory.subscribe(toSubscribeUrl(subscribeUrl));
Invoker<T> invoker = cluster.join(directory);
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
@@ -502,16 +511,6 @@ public class RegistryProtocol implements Protocol {
return url.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY);
}
- public URL getRegisteredConsumerUrl(final URL consumerUrl, URL registryUrl) {
- if (!registryUrl.getParameter(SIMPLIFIED_KEY, false)) {
- return consumerUrl.addParameters(CATEGORY_KEY, CONSUMERS_CATEGORY,
- CHECK_KEY, String.valueOf(false));
- } else {
- return URL.valueOf(consumerUrl, DEFAULT_REGISTER_CONSUMER_KEYS, null).addParameters(
- CATEGORY_KEY, CONSUMERS_CATEGORY, CHECK_KEY, String.valueOf(false));
- }
- }
-
private List<RegistryProtocolListener> findRegistryProtocolListeners(URL url) {
return ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class)
.getActivateExtension(url, "registry.protocol.listener");
@@ -627,7 +626,8 @@ public class RegistryProtocol implements Protocol {
public synchronized void notify(List<URL> urls) {
logger.debug("original override urls: " + urls);
- List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl);
+ List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl.addParameter(CATEGORY_KEY,
+ CONFIGURATORS_CATEGORY));
logger.debug("subscribe url: " + subscribeUrl + ", override urls: " + matchedUrls);
// No matching results
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
index a322388..efcd7e9 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
@@ -57,7 +57,6 @@ import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
import static org.apache.dubbo.common.constants.CommonConstants.FILE_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.REGISTRY_SNAPSHOT_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.ACCEPTS_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY;
@@ -405,7 +404,7 @@ public abstract class AbstractRegistry implements Registry {
return;
}
if (logger.isInfoEnabled()) {
- logger.info("Notify urls for subscribing service " + url.getServiceKey() + ", provider url size: " + urls.size());
+ logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
// keep every provider's category.
Map<String, List<URL>> result = new HashMap<>();
@@ -427,9 +426,7 @@ public abstract class AbstractRegistry implements Registry {
listener.notify(categoryList);
// We will update our cache file after each notification.
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
- if (registryUrl.getParameter(REGISTRY_SNAPSHOT_KEY, false)) {
- saveProperties(url);
- }
+ saveProperties(url);
}
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
deleted file mode 100644
index c2b6929..0000000
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.registry.support;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.URLBuilder;
-import org.apache.dubbo.common.utils.CollectionUtils;
-import org.apache.dubbo.common.utils.UrlUtils;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
-import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
-import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
-
-/**
- * Useful for registries who's sdk returns raw string as provider instance, for example, zookeeper and etcd.
- */
-public abstract class CacheableFailbackRegistry extends FailbackRegistry {
-
- protected final ConcurrentMap<URL, ConcurrentMap<String, URL>> stringUrls = new ConcurrentHashMap<>();
-
- public CacheableFailbackRegistry(URL url) {
- super(url);
- }
-
- protected List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
- if (CollectionUtils.isNotEmpty(providers)) {
- Map<String, URL> consumerStringUrls = stringUrls.computeIfAbsent(consumer, (k) -> new ConcurrentHashMap<>());
- Map<String, URL> copyOfStringUrls = new HashMap<>(consumerStringUrls);
- for (String rawProvider : providers) {
- URL cachedUrl = copyOfStringUrls.remove(rawProvider);
- if (cachedUrl == null) {
- // parse encoded (URLEncoder.encode) url directly.
- URL url = URL.valueOf(rawProvider, true);
- if (isMatch(consumer, url)) {
- consumerStringUrls.put(rawProvider, url);
- }
- }
- }
- copyOfStringUrls.keySet().forEach(consumerStringUrls::remove);
-
- List<URL> urls = new ArrayList<>(consumerStringUrls.size());
- consumerStringUrls.values().forEach(u -> urls.add(UrlUtils.newModifiableUrl(u)));
- return urls;
- }
-
- stringUrls.remove(consumer);
- return new ArrayList<>(1);
- }
-
- protected List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
- List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
- if (urls.isEmpty()) {
- int i = path.lastIndexOf(PATH_SEPARATOR);
- String category = i < 0 ? path : path.substring(i + 1);
- URL empty = URLBuilder.from(consumer)
- .setProtocol(EMPTY_PROTOCOL)
- .addParameter(CATEGORY_KEY, category)
- .build();
- urls.add(empty);
- }
- return urls;
- }
-
- protected abstract boolean isMatch(URL subscribeUrl, URL providerUrl);
-
-}
diff --git a/dubbo-registry/dubbo-registry-default/src/main/java/org/apache/dubbo/registry/dubbo/DubboRegistryFactory.java b/dubbo-registry/dubbo-registry-default/src/main/java/org/apache/dubbo/registry/dubbo/DubboRegistryFactory.java
index 93226e8..bc669c6 100644
--- a/dubbo-registry/dubbo-registry-default/src/main/java/org/apache/dubbo/registry/dubbo/DubboRegistryFactory.java
+++ b/dubbo-registry/dubbo-registry-default/src/main/java/org/apache/dubbo/registry/dubbo/DubboRegistryFactory.java
@@ -35,7 +35,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import static org.apache.dubbo.common.constants.CommonConstants.CALLBACK_INSTANCES_LIMIT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
@@ -105,10 +104,7 @@ public class DubboRegistryFactory extends AbstractRegistryFactory {
urls.add(url.setAddress(address));
}
}
-
- Map<String, String> referParams = url.getParameters();
- URL registryUrl = url.addParameter(INTERFACE_KEY, RegistryService.class.getName()).addParameterAndEncoded(REFER_KEY, url.toParameterString());
- RegistryDirectory<RegistryService> directory = new RegistryDirectory<>(RegistryService.class, registryUrl, referParams);
+ RegistryDirectory<RegistryService> directory = new RegistryDirectory<>(RegistryService.class, url.addParameter(INTERFACE_KEY, RegistryService.class.getName()).addParameterAndEncoded(REFER_KEY, url.toParameterString()));
Invoker<RegistryService> registryInvoker = cluster.join(directory);
RegistryService registryService = proxyFactory.getProxy(registryInvoker);
DubboRegistry registry = new DubboRegistry(registryInvoker, registryService);
diff --git a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/RegistryDirectoryTest.java b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/RegistryDirectoryTest.java
index 3d7225c..9ce163c 100644
--- a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/RegistryDirectoryTest.java
+++ b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/RegistryDirectoryTest.java
@@ -440,7 +440,7 @@ public class RegistryDirectoryTest {
registryDirectory.destroy();
List<Invoker<RegistryDirectoryTest>> cachedInvokers = registryDirectory.getInvokers();
- Map<URL, Invoker<RegistryDirectoryTest>> urlInvokerMap = registryDirectory.getUrlInvokerMap();
+ Map<String, Invoker<RegistryDirectoryTest>> urlInvokerMap = registryDirectory.getUrlInvokerMap();
Assertions.assertNull(cachedInvokers);
Assertions.assertEquals(0, urlInvokerMap.size());
diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
index ad11041..a15d7d5 100644
--- a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
+++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
@@ -22,9 +22,9 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.NotifyListener;
-import org.apache.dubbo.registry.RegistryNotifier;
-import org.apache.dubbo.registry.support.CacheableFailbackRegistry;
+import org.apache.dubbo.registry.support.FailbackRegistry;
import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.apache.dubbo.remoting.etcd.Constants;
import org.apache.dubbo.remoting.etcd.EtcdClient;
import org.apache.dubbo.remoting.etcd.EtcdTransporter;
import org.apache.dubbo.remoting.etcd.StateListener;
@@ -48,6 +48,7 @@ import static org.apache.dubbo.common.constants.RegistryConstants.CONFIGURATORS_
import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGORY;
import static org.apache.dubbo.remoting.Constants.CHECK_KEY;
@@ -56,7 +57,7 @@ import static org.apache.dubbo.remoting.Constants.CHECK_KEY;
/**
* Support for ectd3 registry.
*/
-public class EtcdRegistry extends CacheableFailbackRegistry {
+public class EtcdRegistry extends FailbackRegistry {
private final static Logger logger = LoggerFactory.getLogger(EtcdRegistry.class);
@@ -219,7 +220,8 @@ public class EtcdRegistry extends CacheableFailbackRegistry {
Optional.ofNullable(listeners.get(listener))
.orElseGet(() -> {
ChildListener watchListener, prev;
- prev = listeners.putIfAbsent(listener, watchListener = new RegistryChildListenerImpl(url, path, listener));
+ prev = listeners.putIfAbsent(listener, watchListener = (parentPath, currentChildren) -> EtcdRegistry.this.notify(url, listener,
+ toUrlsWithEmpty(url, parentPath, currentChildren)));
return prev != null ? prev : watchListener;
});
@@ -328,26 +330,30 @@ public class EtcdRegistry extends CacheableFailbackRegistry {
return categories;
}
- @Override
- protected boolean isMatch(URL subscribeUrl, URL providerUrl) {
- return UrlUtils.isMatch(subscribeUrl, providerUrl);
- }
-
- private class RegistryChildListenerImpl implements ChildListener {
- private RegistryNotifier notifier;
-
- public RegistryChildListenerImpl(URL consumerUrl, String path, NotifyListener listener) {
- notifier = new RegistryNotifier(EtcdRegistry.this) {
- @Override
- protected void doNotify(Object rawAddresses) {
- EtcdRegistry.this.notify(consumerUrl, listener, EtcdRegistry.this.toUrlsWithEmpty(consumerUrl, path, (List<String>) rawAddresses));
+ protected List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
+ List<URL> urls = new ArrayList<>();
+ if (providers != null && providers.size() > 0) {
+ for (String provider : providers) {
+ provider = URL.decode(provider);
+ if (provider.contains(Constants.HTTP_SUBFIX_KEY)) {
+ URL url = URL.valueOf(provider);
+ if (UrlUtils.isMatch(consumer, url)) {
+ urls.add(url);
+ }
}
- };
+ }
}
+ return urls;
+ }
- @Override
- public void childChanged(String path, List<String> children) {
- notifier.notify(children);
+ protected List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
+ List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
+ if (urls == null || urls.isEmpty()) {
+ int i = path.lastIndexOf('/');
+ String category = i < 0 ? path : path.substring(i + 1);
+ URL empty = consumer.setProtocol(EMPTY_PROTOCOL).addParameter(CATEGORY_KEY, category);
+ urls.add(empty);
}
+ return urls;
}
}
diff --git a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
index 6a62387..187da71 100644
--- a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
+++ b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
@@ -27,18 +27,15 @@ import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
-import org.apache.dubbo.registry.RegistryNotifier;
import org.apache.dubbo.registry.nacos.util.NacosInstanceManageUtil;
import org.apache.dubbo.registry.support.FailbackRegistry;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
-import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ListView;
-import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
@@ -492,7 +489,24 @@ public class NacosRegistry extends FailbackRegistry {
private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
throws NacosException {
- EventListener eventListener = new RegistryChildListenerImpl(serviceName, url, listener);
+ EventListener eventListener = event -> {
+ if (event instanceof NamingEvent) {
+ NamingEvent e = (NamingEvent) event;
+ List<Instance> instances = e.getInstances();
+
+
+ if (isServiceNamesWithCompatibleMode(url)) {
+ /**
+ * Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned
+ * in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
+ */
+ NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
+ instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);
+ }
+
+ notifySubscriber(url, listener, instances);
+ }
+ };
namingService.subscribe(serviceName,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP),
eventListener);
@@ -612,35 +626,4 @@ public class NacosRegistry extends FailbackRegistry {
void callback(NamingService namingService) throws NacosException;
}
-
- private class RegistryChildListenerImpl implements EventListener {
- private RegistryNotifier notifier;
-
- public RegistryChildListenerImpl(String serviceName, URL consumerUrl, NotifyListener listener) {
- notifier = new RegistryNotifier(NacosRegistry.this) {
- @Override
- protected void doNotify(Object rawAddresses) {
- List<Instance> instances = (List<Instance>) rawAddresses;
- if (isServiceNamesWithCompatibleMode(consumerUrl)) {
- /**
- * Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned
- * in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
- */
- NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
- instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);
- }
- NacosRegistry.this.notifySubscriber(consumerUrl, listener, instances);
- }
- };
- }
-
- @Override
- public void onEvent(Event event) {
- if (event instanceof NamingEvent) {
- NamingEvent e = (NamingEvent) event;
- notifier.notify(e.getInstances());
- }
- }
- }
-
}
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java b/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
index c0072a7..0938a39 100644
--- a/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
+++ b/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.registry.redis;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.constants.RemotingConstants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
@@ -27,7 +28,7 @@ import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.NotifyListener;
-import org.apache.dubbo.registry.support.CacheableFailbackRegistry;
+import org.apache.dubbo.registry.support.FailbackRegistry;
import org.apache.dubbo.rpc.RpcException;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
@@ -56,6 +57,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
@@ -64,6 +66,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD;
import static org.apache.dubbo.registry.Constants.DEFAULT_SESSION_TIMEOUT;
import static org.apache.dubbo.registry.Constants.REGISTER;
@@ -74,7 +77,7 @@ import static org.apache.dubbo.registry.Constants.UNREGISTER;
/**
* RedisRegistry
*/
-public class RedisRegistry extends CacheableFailbackRegistry {
+public class RedisRegistry extends FailbackRegistry {
private static final Logger logger = LoggerFactory.getLogger(RedisRegistry.class);
@@ -434,17 +437,27 @@ public class RedisRegistry extends CacheableFailbackRegistry {
if (!categories.contains(ANY_VALUE) && !categories.contains(category)) {
continue;
}
-
+ List<URL> urls = new ArrayList<>();
Map<String, String> values = jedis.hgetAll(key);
- List<String> rawUrls = new ArrayList<>(values.size());
if (CollectionUtils.isNotEmptyMap(values)) {
for (Map.Entry<String, String> entry : values.entrySet()) {
- if (Long.parseLong(entry.getValue()) >= now) {
- rawUrls.add(entry.getKey());
+ URL u = URL.valueOf(entry.getKey());
+ if (!u.getParameter(DYNAMIC_KEY, true)
+ || Long.parseLong(entry.getValue()) >= now) {
+ if (UrlUtils.isMatch(url, u)) {
+ urls.add(u);
+ }
}
}
}
- List<URL> urls = toUrlsWithEmpty(url, category, rawUrls);
+ if (urls.isEmpty()) {
+ urls.add(URLBuilder.from(url)
+ .setProtocol(EMPTY_PROTOCOL)
+ .setAddress(ANYHOST_VALUE)
+ .setPath(toServiceName(key))
+ .addParameter(CATEGORY_KEY, category)
+ .build());
+ }
result.addAll(urls);
if (logger.isInfoEnabled()) {
logger.info("redis notify: " + key + " = " + urls);
@@ -458,11 +471,6 @@ public class RedisRegistry extends CacheableFailbackRegistry {
}
}
- @Override
- protected boolean isMatch(URL subscribeUrl, URL providerUrl) {
- return !providerUrl.getParameter(DYNAMIC_KEY, true) && UrlUtils.isMatch(subscribeUrl, providerUrl);
- }
-
private String toServiceName(String categoryPath) {
String servicePath = toServicePath(categoryPath);
return servicePath.startsWith(root) ? servicePath.substring(root.length()) : servicePath;
diff --git a/dubbo-registry/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistry.java b/dubbo-registry/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistry.java
index 22b804e..4c784b3 100644
--- a/dubbo-registry/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistry.java
+++ b/dubbo-registry/dubbo-registry-sofa/src/main/java/org/apache/dubbo/registry/sofa/SofaRegistry.java
@@ -21,15 +21,12 @@ import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.NotifyListener;
-import org.apache.dubbo.registry.RegistryNotifier;
-import org.apache.dubbo.registry.support.CacheableFailbackRegistry;
+import org.apache.dubbo.registry.support.FailbackRegistry;
import com.alipay.sofa.registry.client.api.RegistryClient;
import com.alipay.sofa.registry.client.api.RegistryClientConfig;
import com.alipay.sofa.registry.client.api.Subscriber;
-import com.alipay.sofa.registry.client.api.SubscriberDataObserver;
import com.alipay.sofa.registry.client.api.model.RegistryType;
import com.alipay.sofa.registry.client.api.model.UserData;
import com.alipay.sofa.registry.client.api.registration.PublisherRegistration;
@@ -45,10 +42,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
-import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
import static org.apache.dubbo.registry.Constants.PROVIDER_PROTOCOL;
import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
@@ -63,7 +60,7 @@ import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.LOCAL_REGION;
*
* @since 2.7.2
*/
-public class SofaRegistry extends CacheableFailbackRegistry {
+public class SofaRegistry extends FailbackRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(SofaRegistry.class);
@@ -168,14 +165,18 @@ public class SofaRegistry extends CacheableFailbackRegistry {
LOGGER.warn("Service name [" + serviceName + "] have bean registered in SOFARegistry.");
CountDownLatch countDownLatch = new CountDownLatch(1);
- handleRegistryData(url, listSubscriber.peekData(), listener, countDownLatch);
+ handleRegistryData(listSubscriber.peekData(), listener, countDownLatch);
waitAddress(serviceName, countDownLatch);
return;
}
final CountDownLatch latch = new CountDownLatch(1);
SubscriberRegistration subscriberRegistration = new SubscriberRegistration(serviceName,
- new RegistryChildListenerImpl(url, serviceName, listener, latch));
+ (dataId, data) -> {
+ //record change
+ printAddressData(dataId, data);
+ handleRegistryData(data, listener, latch);
+ });
addAttributesForSub(subscriberRegistration);
listSubscriber = registryClient.register(subscriberRegistration);
@@ -206,21 +207,28 @@ public class SofaRegistry extends CacheableFailbackRegistry {
registryClient.unregister(serviceName, DEFAULT_GROUP, RegistryType.SUBSCRIBER);
}
- private void handleRegistryData(URL subscribeUrl, UserData data, NotifyListener notifyListener,
+ private void handleRegistryData(UserData data, NotifyListener notifyListener,
CountDownLatch latch) {
try {
- List<URL> urls = toUrlsWithEmpty(subscribeUrl, PROVIDERS_CATEGORY, flatUserData(data));
+ List<URL> urls = new ArrayList<>();
+ if (null != data) {
+
+ List<String> datas = flatUserData(data);
+ for (String serviceUrl : datas) {
+ URL url = URL.valueOf(serviceUrl);
+ String serverApplication = url.getParameter(APPLICATION_KEY);
+ if (StringUtils.isNotEmpty(serverApplication)) {
+ url = url.addParameter("dstApp", serverApplication);
+ }
+ urls.add(url);
+ }
+ }
notifyListener.notify(urls);
} finally {
latch.countDown();
}
}
- @Override
- protected boolean isMatch(URL subscribeUrl, URL providerUrl) {
- return UrlUtils.isMatch(subscribeUrl, providerUrl);
- }
-
private String buildServiceName(URL url) {
// return url.getServiceKey();
StringBuilder buf = new StringBuilder();
@@ -281,9 +289,6 @@ public class SofaRegistry extends CacheableFailbackRegistry {
*/
protected List<String> flatUserData(UserData userData) {
List<String> result = new ArrayList<>();
- if (userData == null) {
- return result;
- }
Map<String, List<String>> zoneData = userData.getZoneData();
for (Map.Entry<String, List<String>> entry : zoneData.entrySet()) {
@@ -292,24 +297,4 @@ public class SofaRegistry extends CacheableFailbackRegistry {
return result;
}
-
- private class RegistryChildListenerImpl implements SubscriberDataObserver {
- private RegistryNotifier notifier;
-
- public RegistryChildListenerImpl(URL consumerUrl, String path, NotifyListener listener, CountDownLatch latch) {
- notifier = new RegistryNotifier(SofaRegistry.this) {
- @Override
- protected void doNotify(Object rawAddresses) {
- //record change
- printAddressData(path, (UserData) rawAddresses);
- handleRegistryData(consumerUrl, (UserData) rawAddresses, listener, latch);
- }
- };
- }
-
- @Override
- public void handleData(String dataId, UserData data) {
- notifier.notify(data);
- }
- }
}
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
index f51a6e6..51fa4aa 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
@@ -17,14 +17,15 @@
package org.apache.dubbo.registry.zookeeper;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.URLBuilder;
+import org.apache.dubbo.common.URLStrParser;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.NotifyListener;
-import org.apache.dubbo.registry.RegistryNotifier;
-import org.apache.dubbo.registry.support.CacheableFailbackRegistry;
+import org.apache.dubbo.registry.support.FailbackRegistry;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.zookeeper.ChildListener;
import org.apache.dubbo.remoting.zookeeper.StateListener;
@@ -39,24 +40,26 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
+import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_SEPARATOR_ENCODED;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.CONFIGURATORS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGORY;
/**
* ZookeeperRegistry
+ *
*/
-public class ZookeeperRegistry extends CacheableFailbackRegistry {
+public class ZookeeperRegistry extends FailbackRegistry {
private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);
@@ -165,11 +168,10 @@ public class ZookeeperRegistry extends CacheableFailbackRegistry {
}
}
} else {
- CountDownLatch latch = new CountDownLatch(1);
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
- ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch));
+ ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
@@ -177,8 +179,6 @@ public class ZookeeperRegistry extends CacheableFailbackRegistry {
}
}
notify(url, listener, urls);
- // tells the listener to run only after the sync notification of main thread finishes.
- latch.countDown();
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
@@ -187,9 +187,9 @@ public class ZookeeperRegistry extends CacheableFailbackRegistry {
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
- ConcurrentMap<NotifyListener, org.apache.dubbo.remoting.zookeeper.ChildListener> listeners = zkListeners.get(url);
+ ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
- org.apache.dubbo.remoting.zookeeper.ChildListener zkListener = listeners.get(listener);
+ ChildListener zkListener = listeners.get(listener);
if (zkListener != null) {
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
@@ -263,6 +263,35 @@ public class ZookeeperRegistry extends CacheableFailbackRegistry {
return toCategoryPath(url) + PATH_SEPARATOR + URL.encode(url.toFullString());
}
+ private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
+ List<URL> urls = new ArrayList<>();
+ if (CollectionUtils.isNotEmpty(providers)) {
+ for (String provider : providers) {
+ if (provider.contains(PROTOCOL_SEPARATOR_ENCODED)) {
+ URL url = URLStrParser.parseEncodedStr(provider);
+ if (UrlUtils.isMatch(consumer, url)) {
+ urls.add(url);
+ }
+ }
+ }
+ }
+ return urls;
+ }
+
+ private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
+ List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
+ if (urls == null || urls.isEmpty()) {
+ int i = path.lastIndexOf(PATH_SEPARATOR);
+ String category = i < 0 ? path : path.substring(i + 1);
+ URL empty = URLBuilder.from(consumer)
+ .setProtocol(EMPTY_PROTOCOL)
+ .addParameter(CATEGORY_KEY, category)
+ .build();
+ urls.add(empty);
+ }
+ return urls;
+ }
+
/**
* When zookeeper connection recovered from a connection loss, it need to fetch the latest provider list.
* re-register watcher is only a side effect and is not mandate.
@@ -283,53 +312,4 @@ public class ZookeeperRegistry extends CacheableFailbackRegistry {
}
}
- @Override
- protected boolean isMatch(URL subscribeUrl, URL providerUrl) {
- return UrlUtils.isMatch(subscribeUrl, providerUrl);
- }
-
- private class RegistryChildListenerImpl implements ChildListener {
- private RegistryNotifier notifier;
- private long lastExecuteTime;
- private CountDownLatch latch;
-
- public RegistryChildListenerImpl(URL consumerUrl, String path, NotifyListener listener, CountDownLatch latch) {
- this.latch = latch;
- notifier = new RegistryNotifier(ZookeeperRegistry.this) {
- @Override
- public void notify(Object rawAddresses) {
- int delayTime = getRegistry().getDelay();
- if (delayTime <= 0) {
- this.doNotify(rawAddresses);
- } else {
- long interval = delayTime - (System.currentTimeMillis() - lastExecuteTime);
- if (interval > 0) {
- try {
- Thread.sleep(interval);
- } catch (InterruptedException e) {
- // ignore
- }
- }
- lastExecuteTime = System.currentTimeMillis();
- this.doNotify(rawAddresses);
- }
- }
-
- @Override
- protected void doNotify(Object rawAddresses) {
- ZookeeperRegistry.this.notify(consumerUrl, listener, ZookeeperRegistry.this.toUrlsWithEmpty(consumerUrl, path, (List<String>) rawAddresses));
- }
- };
- }
-
- @Override
- public void childChanged(String path, List<String> children) {
- try {
- latch.await();
- } catch (InterruptedException e) {
- logger.warn("Zookeeper children listener thread was interrupted unexpectedly, may cause race condition with the main thread.");
- }
- notifier.notify(children);
- }
- }
}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
index ed7718c..0da56fa 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
@@ -264,7 +264,7 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> {
}
}
if (modified > 0) {
- listener.childChanged(path, new ArrayList<>(urls));
+ notifyExecutor.execute(() -> listener.childChanged(path, new ArrayList<>(urls)));
}
}