You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by me...@apache.org on 2020/05/06 08:47:45 UTC
[dubbo] branch master updated: add new loadbalance strategy (#6064)
This is an automated email from the ASF dual-hosted git repository.
mercyblitz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new 6d2ba7e add new loadbalance strategy (#6064)
6d2ba7e is described below
commit 6d2ba7ec7b5a1cb7971143d4262d0a1bfc826d45
Author: August <33...@users.noreply.github.com>
AuthorDate: Wed May 6 16:47:22 2020 +0800
add new loadbalance strategy (#6064)
* add new loadbalance strategy
* add note
* Update ShortestResponseLoadBalanceTest.java
---
.../loadbalance/ShortestResponseLoadBalance.java | 100 +++++++++++++++++++++
.../org.apache.dubbo.rpc.cluster.LoadBalance | 3 +-
.../cluster/loadbalance/LoadBalanceBaseTest.java | 22 +++++
.../ShortestResponseLoadBalanceTest.java | 53 +++++++++++
4 files changed, 177 insertions(+), 1 deletion(-)
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalance.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalance.java
new file mode 100644
index 0000000..610b1b4
--- /dev/null
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalance.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.loadbalance;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcStatus;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * ShortestResponseLoadBalance
+ * </p>
+ * Filter the number of invokers with the shortest response time of success calls and count the weights and quantities of these invokers.
+ * If there is only one invoker, use the invoker directly;
+ * if there are multiple invokers and the weights are not the same, then random according to the total weight;
+ * if there are multiple invokers and the same weight, then randomly called.
+ */
+public class ShortestResponseLoadBalance extends AbstractLoadBalance {
+
+ public static final String NAME = "shortestresponse";
+
+ @Override
+ protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
+ // Number of invokers
+ int length = invokers.size();
+ // Estimated shortest response time of all invokers
+ long shortestResponse = Long.MAX_VALUE;
+ // The number of invokers having the same estimated shortest response time
+ int shortestCount = 0;
+ // The index of invokers having the same estimated shortest response time
+ int[] shortestIndexes = new int[length];
+ // the weight of every invokers
+ int[] weights = new int[length];
+ // The sum of the warmup weights of all the shortest response invokers
+ int totalWeight = 0;
+ // The weight of the first shortest response invokers
+ int firstWeight = 0;
+ // Every shortest response invoker has the same weight value?
+ boolean sameWeight = true;
+
+ // Filter out all the shortest response invokers
+ for (int i = 0; i < length; i++) {
+ Invoker<T> invoker = invokers.get(i);
+ RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
+ // Calculate the estimated response time from the product of active connections and succeeded average elapsed time.
+ long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed();
+ int active = rpcStatus.getActive();
+ long estimateResponse = succeededAverageElapsed * active;
+ int afterWarmup = getWeight(invoker, invocation);
+ weights[i] = afterWarmup;
+ // Same as LeastActiveLoadBalance
+ if (estimateResponse < shortestResponse) {
+ shortestResponse = estimateResponse;
+ shortestCount = 1;
+ shortestIndexes[0] = i;
+ totalWeight = afterWarmup;
+ firstWeight = afterWarmup;
+ sameWeight = true;
+ } else if (estimateResponse == shortestResponse) {
+ shortestIndexes[shortestCount++] = i;
+ totalWeight += afterWarmup;
+ if (sameWeight && i > 0
+ && afterWarmup != firstWeight) {
+ sameWeight = false;
+ }
+ }
+ }
+ if (shortestCount == 1) {
+ return invokers.get(shortestIndexes[0]);
+ }
+ if (!sameWeight && totalWeight > 0) {
+ int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
+ for (int i = 0; i < shortestCount; i++) {
+ int shortestIndex = shortestIndexes[i];
+ offsetWeight -= weights[shortestIndex];
+ if (offsetWeight < 0) {
+ return invokers.get(shortestIndex);
+ }
+ }
+ }
+ return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]);
+ }
+}
diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.LoadBalance b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.LoadBalance
index 69e1cff..8a636a9 100644
--- a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.LoadBalance
+++ b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.LoadBalance
@@ -1,4 +1,5 @@
random=org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
roundrobin=org.apache.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
leastactive=org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
-consistenthash=org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
\ No newline at end of file
+consistenthash=org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
+shortestresponse=org.apache.dubbo.rpc.cluster.loadbalance.ShortestResponseLoadBalance
\ No newline at end of file
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java
index fef733a..f9ab3ea 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java
@@ -58,6 +58,7 @@ public class LoadBalanceBaseTest {
RpcStatus weightTestRpcStatus1;
RpcStatus weightTestRpcStatus2;
RpcStatus weightTestRpcStatus3;
+ RpcStatus weightTestRpcStatus5;
RpcInvocation weightTestInvocation;
@@ -205,10 +206,13 @@ public class LoadBalanceBaseTest {
}
protected List<Invoker<LoadBalanceBaseTest>> weightInvokers = new ArrayList<Invoker<LoadBalanceBaseTest>>();
+ protected List<Invoker<LoadBalanceBaseTest>> weightInvokersSR= new ArrayList<Invoker<LoadBalanceBaseTest>>();
+
protected Invoker<LoadBalanceBaseTest> weightInvoker1;
protected Invoker<LoadBalanceBaseTest> weightInvoker2;
protected Invoker<LoadBalanceBaseTest> weightInvoker3;
protected Invoker<LoadBalanceBaseTest> weightInvokerTmp;
+ protected Invoker<LoadBalanceBaseTest> weightInvoker5;
@BeforeEach
public void before() throws Exception {
@@ -216,6 +220,7 @@ public class LoadBalanceBaseTest {
weightInvoker2 = mock(Invoker.class, Mockito.withSettings().stubOnly());
weightInvoker3 = mock(Invoker.class, Mockito.withSettings().stubOnly());
weightInvokerTmp = mock(Invoker.class, Mockito.withSettings().stubOnly());
+ weightInvoker5 = mock(Invoker.class, Mockito.withSettings().stubOnly());
weightTestInvocation = new RpcInvocation();
weightTestInvocation.setMethodName("test");
@@ -224,6 +229,7 @@ public class LoadBalanceBaseTest {
URL url2 = URL.valueOf("test2://127.0.0.1:12/DemoService?weight=9&active=0");
URL url3 = URL.valueOf("test3://127.0.0.1:13/DemoService?weight=6&active=1");
URL urlTmp = URL.valueOf("test4://127.0.0.1:9999/DemoService?weight=11&active=0");
+ URL url5 = URL.valueOf("test5://127.0.0.1:15/DemoService?weight=15&active=0");
given(weightInvoker1.isAvailable()).willReturn(true);
given(weightInvoker1.getInterface()).willReturn(LoadBalanceBaseTest.class);
@@ -241,16 +247,32 @@ public class LoadBalanceBaseTest {
given(weightInvokerTmp.getInterface()).willReturn(LoadBalanceBaseTest.class);
given(weightInvokerTmp.getUrl()).willReturn(urlTmp);
+ given(weightInvoker5.isAvailable()).willReturn(true);
+ given(weightInvoker5.getInterface()).willReturn(LoadBalanceBaseTest.class);
+ given(weightInvoker5.getUrl()).willReturn(url5);
+
weightInvokers.add(weightInvoker1);
weightInvokers.add(weightInvoker2);
weightInvokers.add(weightInvoker3);
+ weightInvokersSR.add(weightInvoker1);
+ weightInvokersSR.add(weightInvoker2);
+ weightInvokersSR.add(weightInvoker5);
+
weightTestRpcStatus1 = RpcStatus.getStatus(weightInvoker1.getUrl(), weightTestInvocation.getMethodName());
weightTestRpcStatus2 = RpcStatus.getStatus(weightInvoker2.getUrl(), weightTestInvocation.getMethodName());
weightTestRpcStatus3 = RpcStatus.getStatus(weightInvoker3.getUrl(), weightTestInvocation.getMethodName());
+ weightTestRpcStatus5 = RpcStatus.getStatus(weightInvoker5.getUrl(), weightTestInvocation.getMethodName());
+
// weightTestRpcStatus3 active is 1
RpcStatus.beginCount(weightInvoker3.getUrl(), weightTestInvocation.getMethodName());
+
+ // weightTestRpcStatus5 shortest response time of success calls is bigger than 0
+ // weightTestRpcStatus5 active is 1
+ RpcStatus.beginCount(weightInvoker5.getUrl(), weightTestInvocation.getMethodName());
+ RpcStatus.endCount(weightInvoker5.getUrl(), weightTestInvocation.getMethodName(), 5000L, true);
+ RpcStatus.beginCount(weightInvoker5.getUrl(), weightTestInvocation.getMethodName());
}
protected Map<Invoker, InvokeResult> getWeightedInvokeResult(int runs, String loadbalanceName) {
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalanceTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalanceTest.java
new file mode 100644
index 0000000..86814ae
--- /dev/null
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalanceTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License")); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.loadbalance;
+
+import org.apache.dubbo.rpc.Invoker;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ShortestResponseLoadBalanceTest extends LoadBalanceBaseTest{
+
+ @Test
+ public void testSelectByWeight() {
+ int sumInvoker1 = 0;
+ int sumInvoker2 = 0;
+ int loop = 10000;
+
+ ShortestResponseLoadBalance lb = new ShortestResponseLoadBalance();
+ for (int i = 0; i < loop; i++) {
+ Invoker selected = lb.select(weightInvokersSR, null, weightTestInvocation);
+
+ if (selected.getUrl().getProtocol().equals("test1")) {
+ sumInvoker1++;
+ }
+
+ if (selected.getUrl().getProtocol().equals("test2")) {
+ sumInvoker2++;
+ }
+ // never select invoker5 because it's estimated response time is more than invoker1 and invoker2
+ Assertions.assertTrue(!selected.getUrl().getProtocol().equals("test5"), "select is not the shortest one");
+ }
+
+ // the sumInvoker1 : sumInvoker2 approximately equal to 1: 9
+ System.out.println(sumInvoker1);
+ System.out.println(sumInvoker2);
+
+ Assertions.assertEquals(sumInvoker1 + sumInvoker2, loop, "select failed!");
+ }
+}