You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/08/15 09:50:22 UTC

[dubbo] branch 3.0 updated: optmization about shortestResponseLoadBalance (#8441)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new ffb5209  optmization about shortestResponseLoadBalance (#8441)
ffb5209 is described below

commit ffb520908a9fdef12475351ee2af9f3bb1b09410
Author: lmj <10...@qq.com>
AuthorDate: Sun Aug 15 17:50:14 2021 +0800

    optmization about shortestResponseLoadBalance (#8441)
---
 .../org/apache/dubbo/rpc/cluster/Constants.java    |  5 ++
 .../loadbalance/ShortestResponseLoadBalance.java   | 70 ++++++++++++++++++++--
 .../ShortestResponseLoadBalanceTest.java           | 62 ++++++++++++++++++-
 3 files changed, 130 insertions(+), 7 deletions(-)

diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java
index 320b485..62a6b94 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Constants.java
@@ -118,4 +118,9 @@ public interface Constants {
     String ARGUMENTS = "arguments";
 
     String NEED_REEXPORT = "need-reexport";
+
+    /**
+     * The key of shortestResponseSlidePeriod
+     */
+    String SHORTEST_RESPONSE_SLIDE_PERIOD = "shortestResponseSlidePeriod";
 }
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalance.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalance.java
index 610b1b4..9c0967b 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalance.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalance.java
@@ -17,17 +17,26 @@
 package org.apache.dubbo.rpc.cluster.loadbalance;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.RpcStatus;
+import org.apache.dubbo.rpc.cluster.Constants;
+import org.apache.dubbo.rpc.model.ApplicationModel;
 
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * ShortestResponseLoadBalance
  * </p>
- * Filter the number of invokers with the shortest response time of success calls and count the weights and quantities of these invokers.
+ * Filter the number of invokers with the shortest response time of
+ * success calls and count the weights and quantities of these invokers in last slide window.
  * If there is only one invoker, use the invoker directly;
  * if there are multiple invokers and the weights are not the same, then random according to the total weight;
  * if there are multiple invokers and the same weight, then randomly called.
@@ -36,6 +45,46 @@ public class ShortestResponseLoadBalance extends AbstractLoadBalance {
 
     public static final String NAME = "shortestresponse";
 
+    private static final int SLIDE_PERIOD = ApplicationModel.getEnvironment().getConfiguration().getInt(Constants.SHORTEST_RESPONSE_SLIDE_PERIOD, 30_000);
+
+    private ConcurrentMap<RpcStatus, SlideWindowData> methodMap = new ConcurrentHashMap<>();
+
+    private AtomicBoolean onResetSlideWindow = new AtomicBoolean(false);
+
+    private volatile long lastUpdateTime = System.currentTimeMillis();
+
+    protected static class SlideWindowData {
+        private final static ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor((new NamedThreadFactory("Dubbo-slidePeriod-reset")));
+
+        private long succeededOffset;
+        private long succeededElapsedOffset;
+        private RpcStatus rpcStatus;
+
+        public SlideWindowData(RpcStatus rpcStatus) {
+            this.rpcStatus = rpcStatus;
+            this.succeededOffset = 0;
+            this.succeededElapsedOffset = 0;
+        }
+
+        public void reset() {
+            this.succeededOffset = rpcStatus.getSucceeded();
+            this.succeededElapsedOffset = rpcStatus.getSucceededElapsed();
+        }
+
+        private long getSucceededAverageElapsed() {
+            long succeed = this.rpcStatus.getSucceeded() - this.succeededOffset;
+            if (succeed == 0) {
+                return 0;
+            }
+            return (this.rpcStatus.getSucceededElapsed() - this.succeededElapsedOffset) / succeed;
+        }
+
+        public long getEstimateResponse() {
+            int active = this.rpcStatus.getActive() + 1;
+            return getSucceededAverageElapsed() * active;
+        }
+    }
+
     @Override
     protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
         // Number of invokers
@@ -59,10 +108,10 @@ public class ShortestResponseLoadBalance extends AbstractLoadBalance {
         for (int i = 0; i < length; i++) {
             Invoker<T> invoker = invokers.get(i);
             RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
+            SlideWindowData slideWindowData = methodMap.computeIfAbsent(rpcStatus, SlideWindowData::new);
+
             // Calculate the estimated response time from the product of active connections and succeeded average elapsed time.
-            long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed();
-            int active = rpcStatus.getActive();
-            long estimateResponse = succeededAverageElapsed * active;
+            long estimateResponse = slideWindowData.getEstimateResponse();
             int afterWarmup = getWeight(invoker, invocation);
             weights[i] = afterWarmup;
             // Same as LeastActiveLoadBalance
@@ -77,11 +126,22 @@ public class ShortestResponseLoadBalance extends AbstractLoadBalance {
                 shortestIndexes[shortestCount++] = i;
                 totalWeight += afterWarmup;
                 if (sameWeight && i > 0
-                        && afterWarmup != firstWeight) {
+                    && afterWarmup != firstWeight) {
                     sameWeight = false;
                 }
             }
         }
+
+        if (System.currentTimeMillis() - lastUpdateTime > SLIDE_PERIOD
+            && onResetSlideWindow.compareAndSet(false, true)) {
+            //reset slideWindowData in async way
+            SlideWindowData.EXECUTOR_SERVICE.execute(() -> {
+                methodMap.values().forEach(SlideWindowData::reset);
+                lastUpdateTime = System.currentTimeMillis();
+                onResetSlideWindow.set(false);
+            });
+        }
+
         if (shortestCount == 1) {
             return invokers.get(shortestIndexes[0]);
         }
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalanceTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalanceTest.java
index 86814ae..cfc317f 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalanceTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ShortestResponseLoadBalanceTest.java
@@ -16,14 +16,26 @@
  */
 package org.apache.dubbo.rpc.cluster.loadbalance;
 
+import org.apache.dubbo.common.utils.ReflectUtils;
 import org.apache.dubbo.rpc.Invoker;
-
+import org.apache.dubbo.rpc.RpcStatus;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
-public class ShortestResponseLoadBalanceTest extends LoadBalanceBaseTest{
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class ShortestResponseLoadBalanceTest extends LoadBalanceBaseTest {
 
     @Test
+    @Order(0)
     public void testSelectByWeight() {
         int sumInvoker1 = 0;
         int sumInvoker2 = 0;
@@ -50,4 +62,50 @@ public class ShortestResponseLoadBalanceTest extends LoadBalanceBaseTest{
 
         Assertions.assertEquals(sumInvoker1 + sumInvoker2, loop, "select failed!");
     }
+
+    @Test
+    @Order(1)
+    public void testSelectByResponse() throws NoSuchFieldException, IllegalAccessException {
+        int sumInvoker1 = 0;
+        int sumInvoker2 = 0;
+        int sumInvoker5 = 0;
+        int loop = 10000;
+
+        //active -> 0
+        RpcStatus.endCount(weightInvoker5.getUrl(), weightTestInvocation.getMethodName(), 5000L, true);
+        ShortestResponseLoadBalance lb = new ShortestResponseLoadBalance();
+
+        //reset slideWindow
+        Field lastUpdateTimeField = ReflectUtils.forName(ShortestResponseLoadBalance.class.getName()).getDeclaredField("lastUpdateTime");
+        lastUpdateTimeField.setAccessible(true);
+        lastUpdateTimeField.setLong(lb, System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(31));
+        lb.select(weightInvokersSR, null, weightTestInvocation);
+
+        for (int i = 0; i < loop; i++) {
+            Invoker selected = lb.select(weightInvokersSR, null, weightTestInvocation);
+
+            if (selected.getUrl().getProtocol().equals("test1")) {
+                sumInvoker1++;
+            }
+
+            if (selected.getUrl().getProtocol().equals("test2")) {
+                sumInvoker2++;
+            }
+
+            if (selected.getUrl().getProtocol().equals("test5")) {
+                sumInvoker5++;
+            }
+        }
+        Map<Invoker<LoadBalanceBaseTest>, Integer> weightMap = weightInvokersSR.stream()
+                .collect(Collectors.toMap(Function.identity(), e -> Integer.valueOf(e.getUrl().getParameter("weight"))));
+        Integer totalWeight = weightMap.values().stream().reduce(0, Integer::sum);
+        // max deviation 10%
+        int expectWeightValue = loop / totalWeight;
+        int maxDeviation = expectWeightValue / 10;
+
+        Assertions.assertEquals(sumInvoker1 + sumInvoker2 + sumInvoker5, loop, "select failed!");
+        Assertions.assertTrue(Math.abs(sumInvoker1 / weightMap.get(weightInvoker1) - expectWeightValue) < maxDeviation, "select failed!");
+        Assertions.assertTrue(Math.abs(sumInvoker2 / weightMap.get(weightInvoker2) - expectWeightValue) < maxDeviation, "select failed!");
+        Assertions.assertTrue(Math.abs(sumInvoker5 / weightMap.get(weightInvoker5) - expectWeightValue) < maxDeviation, "select failed!");
+    }
 }