You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/09/28 09:53:23 UTC

[dubbo] branch master updated: [Dubbo-5961] Improve consistent hashing load balancing with a new algorithm #5989 (#8916)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new 11e04fb  [Dubbo-5961] Improve consistent hashing load balancing with a new algorithm #5989 (#8916)
11e04fb is described below

commit 11e04fb6d38f75e2ca9b2bb6a11e79993521dd26
Author: Lei Jiang <53...@users.noreply.github.com>
AuthorDate: Tue Sep 28 17:53:11 2021 +0800

    [Dubbo-5961] Improve consistent hashing load balancing with a new algorithm #5989 (#8916)
    
    * Improve ConsistentHashLoadBalance
    
    * Modify test case
    
    * modify codes
    
    * Modify concurrent problems
    
    * Modify Codes
---
 .../loadbalance/ConsistentHashLoadBalance.java     | 88 +++++++++++++++++++++-
 .../loadbalance/ConsistentHashLoadBalanceTest.java | 38 +++++-----
 2 files changed, 106 insertions(+), 20 deletions(-)

diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java
index 15f65b3..3b9ae55 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java
@@ -21,11 +21,13 @@ import org.apache.dubbo.common.io.Bytes;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.support.RpcUtils;
+
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
 
@@ -53,7 +55,7 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance {
         String methodName = RpcUtils.getMethodName(invocation);
         String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
         // using the hashcode of list to compute the hash only pay attention to the elements in the list
-        int invokersHashCode = invokers.hashCode();
+        int invokersHashCode = getCorrespondingHashCode(invokers);
         ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
         if (selector == null || selector.identityHashCode != invokersHashCode) {
             selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, invokersHashCode));
@@ -62,6 +64,16 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance {
         return selector.select(invocation);
     }
 
+    /**
+     * get hash code of invokers
+     * Make this method to public in order to use this method in test case
+     * @param invokers
+     * @return
+     */
+    public <T> int getCorrespondingHashCode(List<Invoker<T>> invokers){
+        return invokers.hashCode();
+    }
+
     private static final class ConsistentHashSelector<T> {
 
         private final TreeMap<Long, Invoker<T>> virtualInvokers;
@@ -72,6 +84,29 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance {
 
         private final int[] argumentIndex;
 
+        /**
+         * key: server(invoker) address
+         * value: count of requests accept by certain server
+         */
+        private Map<String, AtomicLong> serverRequestCountMap = new ConcurrentHashMap<>();
+
+        /**
+         * count of total requests accept by all servers
+         */
+        private AtomicLong totalRequestCount;
+
+        /**
+         * count of current servers(invokers)
+         */
+        private int serverCount;
+
+        /**
+         * the ratio which allow count of requests accept by each server
+         * overrate average (totalRequestCount/serverCount).
+         * 1.5 is recommended, in the future we can make this param configurable
+         */
+        private static final double OVERLOAD_RATIO_THREAD = 1.5F;
+
         ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
             this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
             this.identityHashCode = identityHashCode;
@@ -92,6 +127,10 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance {
                     }
                 }
             }
+
+            totalRequestCount = new AtomicLong(0);
+            serverCount = invokers.size();
+            serverRequestCountMap.clear();
         }
 
         public Invoker<T> select(Invocation invocation) {
@@ -115,9 +154,56 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance {
             if (entry == null) {
                 entry = virtualInvokers.firstEntry();
             }
+
+            String serverAddress = entry.getValue().getUrl().getAddress();
+
+            /**
+             * The following part of codes aims to select suitable invoker.
+             * This part is not complete thread safety.
+             * However, in the scene of consumer-side load balance,
+             * thread race for this part of codes
+             * (execution time cost for this part of codes without any IO or
+             * network operation is very low) will rarely occur. And even in
+             * extreme case, a few requests are assigned to an invoker which
+             * is above OVERLOAD_RATIO_THREAD will not make a significant impact
+             * on the effect of this new algorithm.
+             * And make this part of codes synchronized will reduce efficiency of
+             * every request. In my opinion, this is not worth. So it is not a
+             * problem for this part is not complete thread safety.
+             */
+            double overloadThread = ((double) totalRequestCount.get() / (double) serverCount) * OVERLOAD_RATIO_THREAD;
+            /**
+             * Find a valid server node:
+             * 1. Not have accept request yet
+             * or
+             * 2. Not have overloaded (request count already accept < thread (average request count * overloadRatioAllowed ))
+             */
+            while (serverRequestCountMap.containsKey(serverAddress)
+                    && serverRequestCountMap.get(serverAddress).get() >= overloadThread) {
+                /**
+                 * If server node is not valid, get next node
+                 */
+                entry = getNextInvokerNode(virtualInvokers, entry);
+                serverAddress = entry.getValue().getUrl().getAddress();
+            }
+            if (!serverRequestCountMap.containsKey(serverAddress)) {
+                serverRequestCountMap.put(serverAddress, new AtomicLong(1));
+            } else {
+                serverRequestCountMap.get(serverAddress).incrementAndGet();
+            }
+            totalRequestCount.incrementAndGet();
+
             return entry.getValue();
         }
 
+        private Map.Entry<Long, Invoker<T>> getNextInvokerNode(TreeMap<Long, Invoker<T>> virtualInvokers, Map.Entry<Long, Invoker<T>> entry){
+            Map.Entry<Long, Invoker<T>> nextEntry = virtualInvokers.higherEntry(entry.getKey());
+            if(nextEntry == null){
+                return virtualInvokers.firstEntry();
+            }
+            return nextEntry;
+        }
+
         private long hash(byte[] digest, int number) {
             return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                     | ((long) (digest[2 + number * 4] & 0xFF) << 16)
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalanceTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalanceTest.java
index 56b2c14..a73483d 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalanceTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalanceTest.java
@@ -18,12 +18,10 @@ package org.apache.dubbo.rpc.cluster.loadbalance;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.cluster.LoadBalance;
 import org.apache.dubbo.rpc.cluster.RouterChain;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
@@ -31,42 +29,44 @@ import java.util.concurrent.atomic.AtomicLong;
 @SuppressWarnings("rawtypes")
 public class ConsistentHashLoadBalanceTest extends LoadBalanceBaseTest {
 
+    /**
+     * Test case for hot key
+     * https://github.com/apache/dubbo/issues/4103
+     * invoke method with same params for 10000 times:
+     * 1. count of request accept by each invoker will not
+     * be higher than (overloadRatioAllowed * average + 1)
+     *
+     */
     @Test
     public void testConsistentHashLoadBalance() {
         int runs = 10000;
-        long unHitedInvokerCount = 0;
-        Map<Invoker, Long> hitedInvokers = new HashMap<>();
         Map<Invoker, AtomicLong> counter = getInvokeCounter(runs, ConsistentHashLoadBalance.NAME);
-        for (Invoker minvoker : counter.keySet()) {
-            Long count = counter.get(minvoker).get();
-
-            if (count == 0) {
-                unHitedInvokerCount++;
-            } else {
-                hitedInvokers.put(minvoker, count);
-            }
+        double overloadRatioAllowed = 1.5F;
+        int serverCount = counter.size();
+        double overloadThread = ((double) runs * overloadRatioAllowed)/((double) serverCount);
+        for (Invoker invoker : counter.keySet()) {
+            Long count = counter.get(invoker).get();
+            Assertions.assertTrue(count < (overloadThread + 1L),
+                    "count of request accept by each invoker will not be higher than (overloadRatioAllowed * average + 1)");
         }
 
-        Assertions.assertEquals(counter.size() - 1,
-                unHitedInvokerCount, "the number of unHitedInvoker should be counter.size() - 1");
-        Assertions.assertEquals(1, hitedInvokers.size(), "the number of hitedInvoker should be 1");
-        Assertions.assertEquals(runs,
-                hitedInvokers.values().iterator().next().intValue(), "the number of hited count should be the number of runs");
     }
 
     // https://github.com/apache/dubbo/issues/5429
     @Test
     void testNormalWhenRouterEnabled() {
-        LoadBalance lb = getLoadBalance(ConsistentHashLoadBalance.NAME);
+        ConsistentHashLoadBalance lb = (ConsistentHashLoadBalance) getLoadBalance(ConsistentHashLoadBalance.NAME);
         URL url = invokers.get(0).getUrl();
         RouterChain<LoadBalanceBaseTest> routerChain = RouterChain.buildChain(url);
         Invoker<LoadBalanceBaseTest> result = lb.select(invokers, url, invocation);
+        int originalHashCode = lb.getCorrespondingHashCode(invokers);
 
         for (int i = 0; i < 100; i++) {
             routerChain.setInvokers(invokers);
             List<Invoker<LoadBalanceBaseTest>> routeInvokers = routerChain.route(url, invocation);
             Invoker<LoadBalanceBaseTest> finalInvoker = lb.select(routeInvokers, url, invocation);
-            Assertions.assertEquals(result, finalInvoker);
+
+            Assertions.assertEquals(originalHashCode, lb.getCorrespondingHashCode(routeInvokers));
         }
     }
 }