You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2019/08/23 07:43:30 UTC
[dubbo] 02/03: Adjust ClusterInterceptor wrapper.
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch cloud-native
in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 33edc5b37cd0bab22ee60e64023264feb1dcebec
Author: ken.lj <ke...@gmail.com>
AuthorDate: Fri Aug 23 15:43:07 2019 +0800
Adjust ClusterInterceptor wrapper.
---
.../cluster/loadbalance/AbstractLoadBalance.java | 38 ++++++++++++++--------
.../dubbo/rpc/cluster/support/FailsafeCluster.java | 5 +--
.../rpc/cluster/support/MergeableCluster.java | 5 +--
.../cluster/support/registry/ZoneAwareCluster.java | 2 +-
.../support/registry/ZoneAwareClusterInvoker.java | 1 +
.../cluster/support/wrapper/AbstractCluster.java | 2 +-
.../support/wrapper/MockClusterWrapper.java | 7 ++--
.../org/apache/dubbo/config/ReferenceConfig.java | 2 +-
.../org/apache/dubbo/config/RegistryConfig.java | 17 ++++++++++
.../org/apache/dubbo/config/ServiceConfig.java | 1 -
.../dubbo/config/builders/RegistryBuilder.java | 15 +++++++++
11 files changed, 72 insertions(+), 23 deletions(-)
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalance.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalance.java
index ed6f3c4..81fd210 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalance.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalance.java
@@ -24,9 +24,9 @@ import org.apache.dubbo.rpc.cluster.LoadBalance;
import java.util.List;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WARMUP;
import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WEIGHT;
-import static org.apache.dubbo.rpc.cluster.Constants.REMOTE_TIMESTAMP_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.WARMUP_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY;
@@ -44,8 +44,8 @@ public abstract class AbstractLoadBalance implements LoadBalance {
* @return weight which takes warmup into account
*/
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
- int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
- return ww < 1 ? 1 : (ww > weight ? weight : ww);
+ int ww = (int) (uptime / ((float) warmup / weight));
+ return ww < 1 ? 1 : (Math.min(ww, weight));
}
@Override
@@ -70,19 +70,31 @@ public abstract class AbstractLoadBalance implements LoadBalance {
* @param invocation the invocation of this invoker
* @return weight
*/
- protected int getWeight(Invoker<?> invoker, Invocation invocation) {
- int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
- if (weight > 0) {
- long timestamp = invoker.getUrl().getParameter(REMOTE_TIMESTAMP_KEY, 0L);
- if (timestamp > 0L) {
- int uptime = (int) (System.currentTimeMillis() - timestamp);
- int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
- if (uptime > 0 && uptime < warmup) {
- weight = calculateWarmupWeight(uptime, warmup, weight);
+ int getWeight(Invoker<?> invoker, Invocation invocation) {
+ int weight = 0;
+ URL url = invoker.getUrl();
+ // Multiple registry scenario, load balance among multiple registries.
+ if (url.getServiceInterface().equals("org.apache.dubbo.registry.RegistryService")) {
+ weight = url.getParameter("registry.weight", DEFAULT_WEIGHT);
+ } else {
+ weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
+ if (weight > 0) {
+ long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
+ if (timestamp > 0L) {
+ long uptime = System.currentTimeMillis() - timestamp;
+ if (uptime >= Integer.MAX_VALUE) {
+ return weight;
+ } else if (uptime < 0) {
+ return 1;
+ }
+ int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
+ if (uptime > 0 && uptime < warmup) {
+ weight = calculateWarmupWeight((int) uptime, warmup, weight);
+ }
}
}
}
- return weight >= 0 ? weight : 0;
+ return Math.max(weight, 0);
}
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailsafeCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailsafeCluster.java
index 422335c..eed0b5b 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailsafeCluster.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailsafeCluster.java
@@ -18,18 +18,19 @@ package org.apache.dubbo.rpc.cluster.support;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.Directory;
/**
* {@link FailsafeClusterInvoker}
*
*/
-public class FailsafeCluster extends AbstractCluster {
+public class FailsafeCluster implements Cluster {
public final static String NAME = "failsafe";
@Override
- protected <T> Invoker<T> doJoin(Directory<T> directory) throws RpcException {
+ public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailsafeClusterInvoker<T>(directory);
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableCluster.java
index ba5c79c..1e0f43d 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableCluster.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableCluster.java
@@ -18,14 +18,15 @@ package org.apache.dubbo.rpc.cluster.support;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.Directory;
-public class MergeableCluster extends AbstractCluster {
+public class MergeableCluster implements Cluster {
public static final String NAME = "mergeable";
@Override
- protected <T> Invoker<T> doJoin(Directory<T> directory) throws RpcException {
+ public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MergeableClusterInvoker<T>(directory);
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareCluster.java
index e709e03..abd80f1 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareCluster.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareCluster.java
@@ -19,7 +19,7 @@ package org.apache.dubbo.rpc.cluster.support.registry;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Directory;
-import org.apache.dubbo.rpc.cluster.support.AbstractCluster;
+import org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster;
/**
* See {@link ZoneAwareClusterInvoker}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareClusterInvoker.java
index 49ca4f7..9ea9723 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareClusterInvoker.java
@@ -38,6 +38,7 @@ import static org.apache.dubbo.common.constants.RegistryConstants.ZONE_KEY;
/**
* When there're more than one registry for subscription.
+ *
* This extension provides a strategy to decide how to distribute traffics among them:
* 1. registry marked as 'preferred=true' has the highest priority.
* 2. check the zone the current request belongs, pick the registry that has the same zone first.
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java
index 2ead330..f9185b9 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.rpc.cluster.support;
+package org.apache.dubbo.rpc.cluster.support.wrapper;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterWrapper.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterWrapper.java
index cfe8cec..aba2438 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterWrapper.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterWrapper.java
@@ -24,8 +24,11 @@ import org.apache.dubbo.rpc.cluster.Directory;
/**
* mock impl
*
+ * Notice! Except for mock, this class also wraps the Invoker with {@link org.apache.dubbo.rpc.ClusterInterceptor}
+ * by extending {@link AbstractCluster#join(Directory)}.
+ *
*/
-public class MockClusterWrapper implements Cluster {
+public class MockClusterWrapper extends AbstractCluster {
private Cluster cluster;
@@ -34,7 +37,7 @@ public class MockClusterWrapper implements Cluster {
}
@Override
- public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
+ protected <T> Invoker<T> doJoin(Directory<T> directory) throws RpcException {
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory));
}
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
index 5a72c7a..90423c1 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
@@ -74,7 +74,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.REVISION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.SEMICOLON_SPLIT_PATTERN;
import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY;
+import static org.apache.dubbo.common.utils.NetUtils.isInvalidLocalHost;
import static org.apache.dubbo.config.Constants.DUBBO_IP_TO_REGISTRY;
import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
import static org.apache.dubbo.registry.Constants.REGISTER_IP_KEY;
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/RegistryConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/RegistryConfig.java
index 41e1857..d20c1ab 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/RegistryConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/RegistryConfig.java
@@ -174,8 +174,17 @@ public class RegistryConfig extends AbstractConfig {
*/
private String accepts;
+ /**
+ * Always use this registry first if set to true, useful when subscribe to multiple registries
+ */
private Boolean preferred;
+ /**
+ * Affects traffic distribution among registries, useful when subscribe to multiple registries
+ * Take effect only when no preferred registry is specified.
+ */
+ private Integer weight;
+
public RegistryConfig() {
}
@@ -494,6 +503,14 @@ public class RegistryConfig extends AbstractConfig {
this.preferred = preferred;
}
+ public Integer getWeight() {
+ return weight;
+ }
+
+ public void setWeight(Integer weight) {
+ this.weight = weight;
+ }
+
@Override
public void refresh() {
super.refresh();
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
index c2560e2..d941fa8 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
@@ -87,7 +87,6 @@ import static org.apache.dubbo.config.Constants.DUBBO_IP_TO_REGISTRY;
import static org.apache.dubbo.config.Constants.DUBBO_PORT_TO_BIND;
import static org.apache.dubbo.config.Constants.DUBBO_PORT_TO_REGISTRY;
import static org.apache.dubbo.config.Constants.MULTICAST;
-import static org.apache.dubbo.config.Constants.PROTOCOLS_SUFFIX;
import static org.apache.dubbo.config.Constants.SCOPE_NONE;
import static org.apache.dubbo.rpc.Constants.GENERIC_KEY;
import static org.apache.dubbo.rpc.Constants.LOCAL_PROTOCOL;
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/builders/RegistryBuilder.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/builders/RegistryBuilder.java
index 53decfd..e67d358 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/builders/RegistryBuilder.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/builders/RegistryBuilder.java
@@ -149,8 +149,17 @@ public class RegistryBuilder extends AbstractBuilder<RegistryConfig, RegistryBui
*/
private String accepts;
+ /**
+ * Always use this registry first if set to true, useful when subscribe to multiple registries
+ */
private Boolean preferred;
+ /**
+ * Affects traffic distribution among registries, useful when subscribe to multiple registries
+ * Take effect only when no preferred registry is specified.
+ */
+ private Integer weight;
+
public static RegistryBuilder newBuilder() {
return new RegistryBuilder();
}
@@ -316,6 +325,11 @@ public class RegistryBuilder extends AbstractBuilder<RegistryConfig, RegistryBui
return getThis();
}
+ public RegistryBuilder weight(Integer weight) {
+ this.weight = weight;
+ return getThis();
+ }
+
public RegistryConfig build() {
RegistryConfig registry = new RegistryConfig();
super.build(registry);
@@ -347,6 +361,7 @@ public class RegistryBuilder extends AbstractBuilder<RegistryConfig, RegistryBui
registry.setUseAsMetadataCenter(useAsMetadataCenter);
registry.setAccepts(accepts);
registry.setPreferred(preferred);
+ registry.setWeight(weight);
return registry;
}