You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2019/08/23 07:43:30 UTC

[dubbo] 02/03: Adjust ClusterInterceptor wrapper.

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch cloud-native
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit 33edc5b37cd0bab22ee60e64023264feb1dcebec
Author: ken.lj <ke...@gmail.com>
AuthorDate: Fri Aug 23 15:43:07 2019 +0800

    Adjust ClusterInterceptor wrapper.
---
 .../cluster/loadbalance/AbstractLoadBalance.java   | 38 ++++++++++++++--------
 .../dubbo/rpc/cluster/support/FailsafeCluster.java |  5 +--
 .../rpc/cluster/support/MergeableCluster.java      |  5 +--
 .../cluster/support/registry/ZoneAwareCluster.java |  2 +-
 .../support/registry/ZoneAwareClusterInvoker.java  |  1 +
 .../cluster/support/wrapper/AbstractCluster.java   |  2 +-
 .../support/wrapper/MockClusterWrapper.java        |  7 ++--
 .../org/apache/dubbo/config/ReferenceConfig.java   |  2 +-
 .../org/apache/dubbo/config/RegistryConfig.java    | 17 ++++++++++
 .../org/apache/dubbo/config/ServiceConfig.java     |  1 -
 .../dubbo/config/builders/RegistryBuilder.java     | 15 +++++++++
 11 files changed, 72 insertions(+), 23 deletions(-)

diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalance.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalance.java
index ed6f3c4..81fd210 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalance.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalance.java
@@ -24,9 +24,9 @@ import org.apache.dubbo.rpc.cluster.LoadBalance;
 
 import java.util.List;
 
+import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
 import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WARMUP;
 import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WEIGHT;
-import static org.apache.dubbo.rpc.cluster.Constants.REMOTE_TIMESTAMP_KEY;
 import static org.apache.dubbo.rpc.cluster.Constants.WARMUP_KEY;
 import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY;
 
@@ -44,8 +44,8 @@ public abstract class AbstractLoadBalance implements LoadBalance {
      * @return weight which takes warmup into account
      */
     static int calculateWarmupWeight(int uptime, int warmup, int weight) {
-        int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
-        return ww < 1 ? 1 : (ww > weight ? weight : ww);
+        int ww = (int) (uptime / ((float) warmup / weight));
+        return ww < 1 ? 1 : (Math.min(ww, weight));
     }
 
     @Override
@@ -70,19 +70,31 @@ public abstract class AbstractLoadBalance implements LoadBalance {
      * @param invocation the invocation of this invoker
      * @return weight
      */
-    protected int getWeight(Invoker<?> invoker, Invocation invocation) {
-        int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
-        if (weight > 0) {
-            long timestamp = invoker.getUrl().getParameter(REMOTE_TIMESTAMP_KEY, 0L);
-            if (timestamp > 0L) {
-                int uptime = (int) (System.currentTimeMillis() - timestamp);
-                int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
-                if (uptime > 0 && uptime < warmup) {
-                    weight = calculateWarmupWeight(uptime, warmup, weight);
+    int getWeight(Invoker<?> invoker, Invocation invocation) {
+        int weight = 0;
+        URL url = invoker.getUrl();
+        // Multiple registry scenario, load balance among multiple registries.
+        if (url.getServiceInterface().equals("org.apache.dubbo.registry.RegistryService")) {
+            weight = url.getParameter("registry.weight", DEFAULT_WEIGHT);
+        } else {
+            weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
+            if (weight > 0) {
+                long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
+                if (timestamp > 0L) {
+                    long uptime = System.currentTimeMillis() - timestamp;
+                    if (uptime >= Integer.MAX_VALUE) {
+                        return weight;
+                    } else if (uptime < 0) {
+                        return 1;
+                    }
+                    int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
+                    if (uptime > 0 && uptime < warmup) {
+                        weight = calculateWarmupWeight((int) uptime, warmup, weight);
+                    }
                 }
             }
         }
-        return weight >= 0 ? weight : 0;
+        return Math.max(weight, 0);
     }
 
 }
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailsafeCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailsafeCluster.java
index 422335c..eed0b5b 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailsafeCluster.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailsafeCluster.java
@@ -18,18 +18,19 @@ package org.apache.dubbo.rpc.cluster.support;
 
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.Cluster;
 import org.apache.dubbo.rpc.cluster.Directory;
 
 /**
  * {@link FailsafeClusterInvoker}
  *
  */
-public class FailsafeCluster extends AbstractCluster {
+public class FailsafeCluster implements Cluster {
 
     public final static String NAME = "failsafe";
 
     @Override
-    protected <T> Invoker<T> doJoin(Directory<T> directory) throws RpcException {
+    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
         return new FailsafeClusterInvoker<T>(directory);
     }
 
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableCluster.java
index ba5c79c..1e0f43d 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableCluster.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableCluster.java
@@ -18,14 +18,15 @@ package org.apache.dubbo.rpc.cluster.support;
 
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.Cluster;
 import org.apache.dubbo.rpc.cluster.Directory;
 
-public class MergeableCluster extends AbstractCluster {
+public class MergeableCluster implements Cluster {
 
     public static final String NAME = "mergeable";
 
     @Override
-    protected <T> Invoker<T> doJoin(Directory<T> directory) throws RpcException {
+    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
         return new MergeableClusterInvoker<T>(directory);
     }
 
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareCluster.java
index e709e03..abd80f1 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareCluster.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareCluster.java
@@ -19,7 +19,7 @@ package org.apache.dubbo.rpc.cluster.support.registry;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.cluster.Directory;
-import org.apache.dubbo.rpc.cluster.support.AbstractCluster;
+import org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster;
 
 /**
  * See {@link ZoneAwareClusterInvoker}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareClusterInvoker.java
index 49ca4f7..9ea9723 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareClusterInvoker.java
@@ -38,6 +38,7 @@ import static org.apache.dubbo.common.constants.RegistryConstants.ZONE_KEY;
 
 /**
  * When there're more than one registry for subscription.
+ *
  * This extension provides a strategy to decide how to distribute traffics among them:
  * 1. registry marked as 'preferred=true' has the highest priority.
  * 2. check the zone the current request belongs, pick the registry that has the same zone first.
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java
index 2ead330..f9185b9 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.rpc.cluster.support;
+package org.apache.dubbo.rpc.cluster.support.wrapper;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.extension.ExtensionLoader;
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterWrapper.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterWrapper.java
index cfe8cec..aba2438 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterWrapper.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterWrapper.java
@@ -24,8 +24,11 @@ import org.apache.dubbo.rpc.cluster.Directory;
 /**
  * mock impl
  *
+ * Notice! Except for mock, this class also wraps the Invoker with {@link org.apache.dubbo.rpc.ClusterInterceptor}
+ * by extending {@link AbstractCluster#join(Directory)}.
+ *
  */
-public class MockClusterWrapper implements Cluster {
+public class MockClusterWrapper extends AbstractCluster {
 
     private Cluster cluster;
 
@@ -34,7 +37,7 @@ public class MockClusterWrapper implements Cluster {
     }
 
     @Override
-    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
+    protected <T> Invoker<T> doJoin(Directory<T> directory) throws RpcException {
         return new MockClusterInvoker<T>(directory,
                 this.cluster.join(directory));
     }
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
index 5a72c7a..90423c1 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
@@ -74,7 +74,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.REVISION_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.SEMICOLON_SPLIT_PATTERN;
 import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY;
+import static org.apache.dubbo.common.utils.NetUtils.isInvalidLocalHost;
 import static org.apache.dubbo.config.Constants.DUBBO_IP_TO_REGISTRY;
 import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
 import static org.apache.dubbo.registry.Constants.REGISTER_IP_KEY;
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/RegistryConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/RegistryConfig.java
index 41e1857..d20c1ab 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/RegistryConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/RegistryConfig.java
@@ -174,8 +174,17 @@ public class RegistryConfig extends AbstractConfig {
      */
     private String accepts;
 
+    /**
+     * Always use this registry first if set to true, useful when subscribe to multiple registries
+     */
     private Boolean preferred;
 
+    /**
+     * Affects traffic distribution among registries, useful when subscribe to multiple registries
+     * Take effect only when no preferred registry is specified.
+     */
+    private Integer weight;
+
     public RegistryConfig() {
     }
 
@@ -494,6 +503,14 @@ public class RegistryConfig extends AbstractConfig {
         this.preferred = preferred;
     }
 
+    public Integer getWeight() {
+        return weight;
+    }
+
+    public void setWeight(Integer weight) {
+        this.weight = weight;
+    }
+
     @Override
     public void refresh() {
         super.refresh();
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
index c2560e2..d941fa8 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
@@ -87,7 +87,6 @@ import static org.apache.dubbo.config.Constants.DUBBO_IP_TO_REGISTRY;
 import static org.apache.dubbo.config.Constants.DUBBO_PORT_TO_BIND;
 import static org.apache.dubbo.config.Constants.DUBBO_PORT_TO_REGISTRY;
 import static org.apache.dubbo.config.Constants.MULTICAST;
-import static org.apache.dubbo.config.Constants.PROTOCOLS_SUFFIX;
 import static org.apache.dubbo.config.Constants.SCOPE_NONE;
 import static org.apache.dubbo.rpc.Constants.GENERIC_KEY;
 import static org.apache.dubbo.rpc.Constants.LOCAL_PROTOCOL;
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/builders/RegistryBuilder.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/builders/RegistryBuilder.java
index 53decfd..e67d358 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/builders/RegistryBuilder.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/builders/RegistryBuilder.java
@@ -149,8 +149,17 @@ public class RegistryBuilder extends AbstractBuilder<RegistryConfig, RegistryBui
      */
     private String accepts;
 
+    /**
+     * Always use this registry first if set to true, useful when subscribe to multiple registries
+     */
     private Boolean preferred;
 
+    /**
+     * Affects traffic distribution among registries, useful when subscribe to multiple registries
+     * Take effect only when no preferred registry is specified.
+     */
+    private Integer weight;
+
     public static RegistryBuilder newBuilder() {
         return new RegistryBuilder();
     }
@@ -316,6 +325,11 @@ public class RegistryBuilder extends AbstractBuilder<RegistryConfig, RegistryBui
         return getThis();
     }
 
+    public RegistryBuilder weight(Integer weight) {
+        this.weight = weight;
+        return getThis();
+    }
+
     public RegistryConfig build() {
         RegistryConfig registry = new RegistryConfig();
         super.build(registry);
@@ -347,6 +361,7 @@ public class RegistryBuilder extends AbstractBuilder<RegistryConfig, RegistryBui
         registry.setUseAsMetadataCenter(useAsMetadataCenter);
         registry.setAccepts(accepts);
         registry.setPreferred(preferred);
+        registry.setWeight(weight);
 
         return registry;
     }