You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by me...@apache.org on 2020/05/06 08:47:45 UTC

[dubbo] branch master updated: add new loadbalance strategy (#6064)

This is an automated email from the ASF dual-hosted git repository.

mercyblitz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d2ba7e  add new loadbalance strategy (#6064)
6d2ba7e is described below

commit 6d2ba7ec7b5a1cb7971143d4262d0a1bfc826d45
Author: August <33...@users.noreply.github.com>
AuthorDate: Wed May 6 16:47:22 2020 +0800

    add new loadbalance strategy (#6064)
    
    * add new loadbalance strategy
    
    * add note
    
    * Update ShortestResponseLoadBalanceTest.java
---
 .../loadbalance/ShortestResponseLoadBalance.java   | 100 +++++++++++++++++++++
 .../org.apache.dubbo.rpc.cluster.LoadBalance       |   3 +-
 .../cluster/loadbalance/LoadBalanceBaseTest.java   |  22 +++++
 .../ShortestResponseLoadBalanceTest.java           |  53 +++++++++++
 4 files changed, 177 insertions(+), 1 deletion(-)

diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalance.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalance.java
new file mode 100644
index 0000000..610b1b4
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalance.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.loadbalance;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcStatus;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * ShortestResponseLoadBalance
+ * </p>
+ * Filter the number of invokers with the shortest response time of success calls and count the weights and quantities of these invokers.
+ * If there is only one invoker, use the invoker directly;
+ * if there are multiple invokers and the weights are not the same, then random according to the total weight;
+ * if there are multiple invokers and the same weight, then randomly called.
+ */
+public class ShortestResponseLoadBalance extends AbstractLoadBalance {
+
+    public static final String NAME = "shortestresponse";
+
+    @Override
+    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
+        // Number of invokers
+        int length = invokers.size();
+        // Estimated shortest response time of all invokers
+        long shortestResponse = Long.MAX_VALUE;
+        // The number of invokers having the same estimated shortest response time
+        int shortestCount = 0;
+        // The index of invokers having the same estimated shortest response time
+        int[] shortestIndexes = new int[length];
+        // the weight of every invokers
+        int[] weights = new int[length];
+        // The sum of the warmup weights of all the shortest response  invokers
+        int totalWeight = 0;
+        // The weight of the first shortest response invokers
+        int firstWeight = 0;
+        // Every shortest response invoker has the same weight value?
+        boolean sameWeight = true;
+
+        // Filter out all the shortest response invokers
+        for (int i = 0; i < length; i++) {
+            Invoker<T> invoker = invokers.get(i);
+            RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
+            // Calculate the estimated response time from the product of active connections and succeeded average elapsed time.
+            long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed();
+            int active = rpcStatus.getActive();
+            long estimateResponse = succeededAverageElapsed * active;
+            int afterWarmup = getWeight(invoker, invocation);
+            weights[i] = afterWarmup;
+            // Same as LeastActiveLoadBalance
+            if (estimateResponse < shortestResponse) {
+                shortestResponse = estimateResponse;
+                shortestCount = 1;
+                shortestIndexes[0] = i;
+                totalWeight = afterWarmup;
+                firstWeight = afterWarmup;
+                sameWeight = true;
+            } else if (estimateResponse == shortestResponse) {
+                shortestIndexes[shortestCount++] = i;
+                totalWeight += afterWarmup;
+                if (sameWeight && i > 0
+                        && afterWarmup != firstWeight) {
+                    sameWeight = false;
+                }
+            }
+        }
+        if (shortestCount == 1) {
+            return invokers.get(shortestIndexes[0]);
+        }
+        if (!sameWeight && totalWeight > 0) {
+            int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
+            for (int i = 0; i < shortestCount; i++) {
+                int shortestIndex = shortestIndexes[i];
+                offsetWeight -= weights[shortestIndex];
+                if (offsetWeight < 0) {
+                    return invokers.get(shortestIndex);
+                }
+            }
+        }
+        return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]);
+    }
+}
diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.LoadBalance b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.LoadBalance
index 69e1cff..8a636a9 100644
--- a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.LoadBalance
+++ b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.LoadBalance
@@ -1,4 +1,5 @@
 random=org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
 roundrobin=org.apache.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
 leastactive=org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
-consistenthash=org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
\ No newline at end of file
+consistenthash=org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
+shortestresponse=org.apache.dubbo.rpc.cluster.loadbalance.ShortestResponseLoadBalance
\ No newline at end of file
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java
index fef733a..f9ab3ea 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java
@@ -58,6 +58,7 @@ public class LoadBalanceBaseTest {
     RpcStatus weightTestRpcStatus1;
     RpcStatus weightTestRpcStatus2;
     RpcStatus weightTestRpcStatus3;
+    RpcStatus weightTestRpcStatus5;
 
     RpcInvocation weightTestInvocation;
 
@@ -205,10 +206,13 @@ public class LoadBalanceBaseTest {
     }
 
     protected List<Invoker<LoadBalanceBaseTest>> weightInvokers = new ArrayList<Invoker<LoadBalanceBaseTest>>();
+    protected List<Invoker<LoadBalanceBaseTest>> weightInvokersSR= new ArrayList<Invoker<LoadBalanceBaseTest>>();
+
     protected Invoker<LoadBalanceBaseTest> weightInvoker1;
     protected Invoker<LoadBalanceBaseTest> weightInvoker2;
     protected Invoker<LoadBalanceBaseTest> weightInvoker3;
     protected Invoker<LoadBalanceBaseTest> weightInvokerTmp;
+    protected Invoker<LoadBalanceBaseTest> weightInvoker5;
 
     @BeforeEach
     public void before() throws Exception {
@@ -216,6 +220,7 @@ public class LoadBalanceBaseTest {
         weightInvoker2 = mock(Invoker.class, Mockito.withSettings().stubOnly());
         weightInvoker3 = mock(Invoker.class, Mockito.withSettings().stubOnly());
         weightInvokerTmp = mock(Invoker.class, Mockito.withSettings().stubOnly());
+        weightInvoker5 = mock(Invoker.class, Mockito.withSettings().stubOnly());
 
         weightTestInvocation = new RpcInvocation();
         weightTestInvocation.setMethodName("test");
@@ -224,6 +229,7 @@ public class LoadBalanceBaseTest {
         URL url2 = URL.valueOf("test2://127.0.0.1:12/DemoService?weight=9&active=0");
         URL url3 = URL.valueOf("test3://127.0.0.1:13/DemoService?weight=6&active=1");
         URL urlTmp = URL.valueOf("test4://127.0.0.1:9999/DemoService?weight=11&active=0");
+        URL url5 = URL.valueOf("test5://127.0.0.1:15/DemoService?weight=15&active=0");
 
         given(weightInvoker1.isAvailable()).willReturn(true);
         given(weightInvoker1.getInterface()).willReturn(LoadBalanceBaseTest.class);
@@ -241,16 +247,32 @@ public class LoadBalanceBaseTest {
         given(weightInvokerTmp.getInterface()).willReturn(LoadBalanceBaseTest.class);
         given(weightInvokerTmp.getUrl()).willReturn(urlTmp);
 
+        given(weightInvoker5.isAvailable()).willReturn(true);
+        given(weightInvoker5.getInterface()).willReturn(LoadBalanceBaseTest.class);
+        given(weightInvoker5.getUrl()).willReturn(url5);
+
         weightInvokers.add(weightInvoker1);
         weightInvokers.add(weightInvoker2);
         weightInvokers.add(weightInvoker3);
 
+        weightInvokersSR.add(weightInvoker1);
+        weightInvokersSR.add(weightInvoker2);
+        weightInvokersSR.add(weightInvoker5);
+
         weightTestRpcStatus1 = RpcStatus.getStatus(weightInvoker1.getUrl(), weightTestInvocation.getMethodName());
         weightTestRpcStatus2 = RpcStatus.getStatus(weightInvoker2.getUrl(), weightTestInvocation.getMethodName());
         weightTestRpcStatus3 = RpcStatus.getStatus(weightInvoker3.getUrl(), weightTestInvocation.getMethodName());
+        weightTestRpcStatus5 = RpcStatus.getStatus(weightInvoker5.getUrl(), weightTestInvocation.getMethodName());
+
 
         // weightTestRpcStatus3 active is 1
         RpcStatus.beginCount(weightInvoker3.getUrl(), weightTestInvocation.getMethodName());
+
+        // weightTestRpcStatus5 shortest response time of success calls is bigger than 0
+        // weightTestRpcStatus5 active is 1
+        RpcStatus.beginCount(weightInvoker5.getUrl(), weightTestInvocation.getMethodName());
+        RpcStatus.endCount(weightInvoker5.getUrl(), weightTestInvocation.getMethodName(), 5000L, true);
+        RpcStatus.beginCount(weightInvoker5.getUrl(), weightTestInvocation.getMethodName());
     }
 
     protected Map<Invoker, InvokeResult> getWeightedInvokeResult(int runs, String loadbalanceName) {
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalanceTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalanceTest.java
new file mode 100644
index 0000000..86814ae
--- /dev/null
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalanceTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License")); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.loadbalance;
+
+import org.apache.dubbo.rpc.Invoker;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ShortestResponseLoadBalanceTest extends LoadBalanceBaseTest{
+
+    @Test
+    public void testSelectByWeight() {
+        int sumInvoker1 = 0;
+        int sumInvoker2 = 0;
+        int loop = 10000;
+
+        ShortestResponseLoadBalance lb = new ShortestResponseLoadBalance();
+        for (int i = 0; i < loop; i++) {
+            Invoker selected = lb.select(weightInvokersSR, null, weightTestInvocation);
+
+            if (selected.getUrl().getProtocol().equals("test1")) {
+                sumInvoker1++;
+            }
+
+            if (selected.getUrl().getProtocol().equals("test2")) {
+                sumInvoker2++;
+            }
+            // never select invoker5 because it's estimated response time is more than invoker1 and invoker2
+            Assertions.assertTrue(!selected.getUrl().getProtocol().equals("test5"), "select is not the shortest one");
+        }
+
+        // the sumInvoker1 : sumInvoker2 approximately equal to 1: 9
+        System.out.println(sumInvoker1);
+        System.out.println(sumInvoker2);
+
+        Assertions.assertEquals(sumInvoker1 + sumInvoker2, loop, "select failed!");
+    }
+}