You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/08/15 09:50:22 UTC
[dubbo] branch 3.0 updated: optmization about
shortestResponseLoadBalance (#8441)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new ffb5209 optmization about shortestResponseLoadBalance (#8441)
ffb5209 is described below
commit ffb520908a9fdef12475351ee2af9f3bb1b09410
Author: lmj <10...@qq.com>
AuthorDate: Sun Aug 15 17:50:14 2021 +0800
optmization about shortestResponseLoadBalance (#8441)
---
.../org/apache/dubbo/rpc/cluster/Constants.java | 5 ++
.../loadbalance/ShortestResponseLoadBalance.java | 70 ++++++++++++++++++++--
.../ShortestResponseLoadBalanceTest.java | 62 ++++++++++++++++++-
3 files changed, 130 insertions(+), 7 deletions(-)
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java
index 320b485..62a6b94 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java
@@ -118,4 +118,9 @@ public interface Constants {
String ARGUMENTS = "arguments";
String NEED_REEXPORT = "need-reexport";
+
+ /**
+ * The key of shortestResponseSlidePeriod
+ */
+ String SHORTEST_RESPONSE_SLIDE_PERIOD = "shortestResponseSlidePeriod";
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalance.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalance.java
index 610b1b4..9c0967b 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalance.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalance.java
@@ -17,17 +17,26 @@
package org.apache.dubbo.rpc.cluster.loadbalance;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcStatus;
+import org.apache.dubbo.rpc.cluster.Constants;
+import org.apache.dubbo.rpc.model.ApplicationModel;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* ShortestResponseLoadBalance
* </p>
- * Filter the number of invokers with the shortest response time of success calls and count the weights and quantities of these invokers.
+ * Filter the number of invokers with the shortest response time of
+ * success calls and count the weights and quantities of these invokers in last slide window.
* If there is only one invoker, use the invoker directly;
* if there are multiple invokers and the weights are not the same, then random according to the total weight;
* if there are multiple invokers and the same weight, then randomly called.
@@ -36,6 +45,46 @@ public class ShortestResponseLoadBalance extends AbstractLoadBalance {
public static final String NAME = "shortestresponse";
+ private static final int SLIDE_PERIOD = ApplicationModel.getEnvironment().getConfiguration().getInt(Constants.SHORTEST_RESPONSE_SLIDE_PERIOD, 30_000);
+
+ private ConcurrentMap<RpcStatus, SlideWindowData> methodMap = new ConcurrentHashMap<>();
+
+ private AtomicBoolean onResetSlideWindow = new AtomicBoolean(false);
+
+ private volatile long lastUpdateTime = System.currentTimeMillis();
+
+ protected static class SlideWindowData {
+ private final static ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor((new NamedThreadFactory("Dubbo-slidePeriod-reset")));
+
+ private long succeededOffset;
+ private long succeededElapsedOffset;
+ private RpcStatus rpcStatus;
+
+ public SlideWindowData(RpcStatus rpcStatus) {
+ this.rpcStatus = rpcStatus;
+ this.succeededOffset = 0;
+ this.succeededElapsedOffset = 0;
+ }
+
+ public void reset() {
+ this.succeededOffset = rpcStatus.getSucceeded();
+ this.succeededElapsedOffset = rpcStatus.getSucceededElapsed();
+ }
+
+ private long getSucceededAverageElapsed() {
+ long succeed = this.rpcStatus.getSucceeded() - this.succeededOffset;
+ if (succeed == 0) {
+ return 0;
+ }
+ return (this.rpcStatus.getSucceededElapsed() - this.succeededElapsedOffset) / succeed;
+ }
+
+ public long getEstimateResponse() {
+ int active = this.rpcStatus.getActive() + 1;
+ return getSucceededAverageElapsed() * active;
+ }
+ }
+
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
@@ -59,10 +108,10 @@ public class ShortestResponseLoadBalance extends AbstractLoadBalance {
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
+ SlideWindowData slideWindowData = methodMap.computeIfAbsent(rpcStatus, SlideWindowData::new);
+
// Calculate the estimated response time from the product of active connections and succeeded average elapsed time.
- long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed();
- int active = rpcStatus.getActive();
- long estimateResponse = succeededAverageElapsed * active;
+ long estimateResponse = slideWindowData.getEstimateResponse();
int afterWarmup = getWeight(invoker, invocation);
weights[i] = afterWarmup;
// Same as LeastActiveLoadBalance
@@ -77,11 +126,22 @@ public class ShortestResponseLoadBalance extends AbstractLoadBalance {
shortestIndexes[shortestCount++] = i;
totalWeight += afterWarmup;
if (sameWeight && i > 0
- && afterWarmup != firstWeight) {
+ && afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
+
+ if (System.currentTimeMillis() - lastUpdateTime > SLIDE_PERIOD
+ && onResetSlideWindow.compareAndSet(false, true)) {
+ //reset slideWindowData in async way
+ SlideWindowData.EXECUTOR_SERVICE.execute(() -> {
+ methodMap.values().forEach(SlideWindowData::reset);
+ lastUpdateTime = System.currentTimeMillis();
+ onResetSlideWindow.set(false);
+ });
+ }
+
if (shortestCount == 1) {
return invokers.get(shortestIndexes[0]);
}
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalanceTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalanceTest.java
index 86814ae..cfc317f 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalanceTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalanceTest.java
@@ -16,14 +16,26 @@
*/
package org.apache.dubbo.rpc.cluster.loadbalance;
+import org.apache.dubbo.common.utils.ReflectUtils;
import org.apache.dubbo.rpc.Invoker;
-
+import org.apache.dubbo.rpc.RpcStatus;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
-public class ShortestResponseLoadBalanceTest extends LoadBalanceBaseTest{
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class ShortestResponseLoadBalanceTest extends LoadBalanceBaseTest {
@Test
+ @Order(0)
public void testSelectByWeight() {
int sumInvoker1 = 0;
int sumInvoker2 = 0;
@@ -50,4 +62,50 @@ public class ShortestResponseLoadBalanceTest extends LoadBalanceBaseTest{
Assertions.assertEquals(sumInvoker1 + sumInvoker2, loop, "select failed!");
}
+
+ @Test
+ @Order(1)
+ public void testSelectByResponse() throws NoSuchFieldException, IllegalAccessException {
+ int sumInvoker1 = 0;
+ int sumInvoker2 = 0;
+ int sumInvoker5 = 0;
+ int loop = 10000;
+
+ //active -> 0
+ RpcStatus.endCount(weightInvoker5.getUrl(), weightTestInvocation.getMethodName(), 5000L, true);
+ ShortestResponseLoadBalance lb = new ShortestResponseLoadBalance();
+
+ //reset slideWindow
+ Field lastUpdateTimeField = ReflectUtils.forName(ShortestResponseLoadBalance.class.getName()).getDeclaredField("lastUpdateTime");
+ lastUpdateTimeField.setAccessible(true);
+ lastUpdateTimeField.setLong(lb, System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(31));
+ lb.select(weightInvokersSR, null, weightTestInvocation);
+
+ for (int i = 0; i < loop; i++) {
+ Invoker selected = lb.select(weightInvokersSR, null, weightTestInvocation);
+
+ if (selected.getUrl().getProtocol().equals("test1")) {
+ sumInvoker1++;
+ }
+
+ if (selected.getUrl().getProtocol().equals("test2")) {
+ sumInvoker2++;
+ }
+
+ if (selected.getUrl().getProtocol().equals("test5")) {
+ sumInvoker5++;
+ }
+ }
+ Map<Invoker<LoadBalanceBaseTest>, Integer> weightMap = weightInvokersSR.stream()
+ .collect(Collectors.toMap(Function.identity(), e -> Integer.valueOf(e.getUrl().getParameter("weight"))));
+ Integer totalWeight = weightMap.values().stream().reduce(0, Integer::sum);
+ // max deviation 10%
+ int expectWeightValue = loop / totalWeight;
+ int maxDeviation = expectWeightValue / 10;
+
+ Assertions.assertEquals(sumInvoker1 + sumInvoker2 + sumInvoker5, loop, "select failed!");
+ Assertions.assertTrue(Math.abs(sumInvoker1 / weightMap.get(weightInvoker1) - expectWeightValue) < maxDeviation, "select failed!");
+ Assertions.assertTrue(Math.abs(sumInvoker2 / weightMap.get(weightInvoker2) - expectWeightValue) < maxDeviation, "select failed!");
+ Assertions.assertTrue(Math.abs(sumInvoker5 / weightMap.get(weightInvoker5) - expectWeightValue) < maxDeviation, "select failed!");
+ }
}