You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by "manzhizhen (GitHub)" <gi...@apache.org> on 2018/09/28 05:28:15 UTC

[GitHub] [incubator-dubbo] manzhizhen opened issue #2578: The current RoundRobinLoadBalance implementation performance is not satisfactory

- [ ] I have searched the [issues](https://github.com/apache/incubator-dubbo/issues) of this repository and believe that this is not a duplicate.
- [ ] I have checked the [FAQ](https://github.com/apache/incubator-dubbo/blob/master/FAQ.md) of this repository and believe that this is not a duplicate.

### Environment

* Dubbo version: newest master and 2.5.3
* Operating System version: mac
* Java version: 1.8

### Steps to reproduce this issue
Our company currently uses dubbo-2.5.3, LoadBalance chooses roundrobin. After the weight adjustment is made, the distribution of traffic is not in line with expectations. So I saw the implementation of RoundRobinLoadBalance of 2.5.3, which logically led to the distribution of traffic. Not in line with expectations, so I read the latest dubbo version of the implementation, found that the latest version of RoundRobinLoadBalance implementation traffic distribution is in line with expectations, but the performance is not satisfactory.


Test Code:


```
package com.manzhizhen.study.loadbalance;

import com.alibaba.dubbo.common.utils.AtomicPositiveInteger;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;

/**
 * Created by yizhenqiang on 18/9/26.
 */
public class DubboRoundRobinLoadBalance {

    /**
     * 假设该接口有10个可用的Invoker
     */
    private static final int INVOKER_SIZE = 10;
    private static final int[] INVOKER_WEIGHT_ARRAY = new int[]{100, 100, 200, 200, 300, 300, 400, 400, 500, 500};

    private static final String SERVICE_KEY = "com.test.Test.testMethod";

    private static final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    private static final ConcurrentMap<String, AtomicPositiveInteger> sequences1 = new ConcurrentHashMap<String, AtomicPositiveInteger>();
    private static final ConcurrentMap<String, AtomicPositiveInteger> weightSequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();


    public static void main(String[] args) {
        int times = 1000000;
        int[] selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
        long start = System.nanoTime();
        while (times-- > 0) {
            int select = currentSelect();
            selectArray[select]++;
        }

        System.out.println("最新dubbo的RoundRobinLoadBalance耗时:" + (System.nanoTime() - start) / 1000000);
        System.out.println("最新dubbo的RoundRobinLoadBalance流量分布:" + JSON.toJSONString(selectArray));

        times = 1000000;
        selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
        start = System.nanoTime();
        while (times-- > 0) {
            int select = oldSelect();
            selectArray[select]++;
        }

        System.out.println("dubbo-2.5.3的RoundRobinLoadBalance耗时:" + (System.nanoTime() - start) / 1000000);
        System.out.println("dubbo-2.5.3的RoundRobinLoadBalance流量分布:" + JSON.toJSONString(selectArray));

        times = 1000000;
        selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
        start = System.nanoTime();
        while (times-- > 0) {
            int select = oldRandomSelect();
            selectArray[select]++;
        }

        System.out.println("dubbo-2.5.3的RandomLoadBalance耗时:" + (System.nanoTime() - start) / 1000000);
        System.out.println("dubbo-2.5.3的RandomLoadBalance流量分布:" + JSON.toJSONString(selectArray));

    }

    /**
     * 当前最新版本dubbo master分支中实现方式
     *
     * @return 选择的invoker的index
     */
    private static int currentSelect() {
        // 为了测试方便,key默认写死
        String key = SERVICE_KEY;
        // invoker默认是10个
        int length = INVOKER_SIZE; // Number of invokers

        int maxWeight = 0; // The maximum weight
        int minWeight = Integer.MAX_VALUE; // The minimum weight
        final LinkedHashMap<Integer, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Integer, IntegerWrapper>();
        int weightSum = 0;
        for (int i = 0; i < length; i++) {
            int weight = getWeight(i);
            maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
            minWeight = Math.min(minWeight, weight); // Choose the minimum weight
            if (weight > 0) {
                invokerToWeightMap.put(i, new IntegerWrapper(weight));
                weightSum += weight;
            }
        }
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        int currentSequence = sequence.getAndIncrement();
        if (maxWeight > 0 && minWeight < maxWeight) {
            int mod = currentSequence % weightSum;
            for (int i = 0; i < maxWeight; i++) {
                for (Map.Entry<Integer, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                    final Integer k = each.getKey();
                    final IntegerWrapper v = each.getValue();
                    if (mod == 0 && v.getValue() > 0) {
                        return k;
                    }
                    if (v.getValue() > 0) {
                        v.decrement();
                        mod--;
                    }
                }
            }
        }
        // Round robin
        return currentSequence % length;
    }

    /**
     * 2.5.3版本的roundrobin方式
     *
     * @return
     */
    private static int oldSelect() {
        // 为了测试方便,key默认写死
        String key = SERVICE_KEY;
        // invoker默认是10个
        int length = INVOKER_SIZE; // Number of invokers

        List<Integer> invokers = Lists.newArrayList();

        int maxWeight = 0; // 最大权重
        int minWeight = Integer.MAX_VALUE; // 最小权重
        for (int i = 0; i < length; i++) {
            int weight = getWeight(i);
            maxWeight = Math.max(maxWeight, weight); // 累计最大权重
            minWeight = Math.min(minWeight, weight); // 累计最小权重
        }
        if (maxWeight > 0 && minWeight < maxWeight) { // 权重不一样
            AtomicPositiveInteger weightSequence = weightSequences.get(key);
            if (weightSequence == null) {
                weightSequences.putIfAbsent(key, new AtomicPositiveInteger());
                weightSequence = weightSequences.get(key);
            }
            int currentWeight = weightSequence.getAndIncrement() % maxWeight;
            List<Integer> weightInvokers = new ArrayList<Integer>();
            for (int i = 0; i < INVOKER_SIZE; i++) { // 筛选权重大于当前权重基数的Invoker
                if (getWeight(i) > currentWeight) {
                    weightInvokers.add(i);
                }
            }
            int weightLength = weightInvokers.size();
            if (weightLength == 1) {
                return weightInvokers.get(0);
            } else if (weightLength > 1) {
                invokers = weightInvokers;
                length = invokers.size();
            }
        }
        AtomicPositiveInteger sequence = sequences1.get(key);
        if (sequence == null) {
            sequences1.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences1.get(key);
        }
        // 取模轮循
        return invokers.get(sequence.getAndIncrement() % length);
    }

    /**
     * 2.5.3版本的random方式
     *
     * @return
     */
    private static int oldRandomSelect() {
        // 为了测试方便,key默认写死
        String key = SERVICE_KEY;
        // invoker默认是10个
        int length = INVOKER_SIZE; // Number of invokers
        int totalWeight = 0; // 总权重

        boolean sameWeight = true; // 权重是否都一样
        for (int i = 0; i < length; i++) {
            int weight = getWeight(i);
            totalWeight += weight; // 累计总权重
            if (sameWeight && i > 0
                    && weight != getWeight(i - 1)) {
                sameWeight = false; // 计算所有权重是否一样
            }
        }
        if (totalWeight > 0 && !sameWeight) {
            // 如果权重不相同且权重大于0则按总权重数随机
            int offset = ThreadLocalRandom.current().nextInt(totalWeight);
            // 并确定随机值落在哪个片断上
            for (int i = 0; i < length; i++) {
                offset -= getWeight(i);
                if (offset < 0) {
                    return i;
                }
            }
        }
        // 如果权重相同或权重为0则均等随机
        return ThreadLocalRandom.current().nextInt(length);
    }

    private static int getWeight(int invokerIndex) {
        return INVOKER_WEIGHT_ARRAY[invokerIndex];
    }

    private static final class IntegerWrapper {
        private int value;

        public IntegerWrapper(int value) {
            this.value = value;
        }

        public int getValue() {
            return value;
        }

        public void setValue(int value) {
            this.value = value;
        }

        public void decrement() {
            this.value--;
        }
    }

}

```

```
最新dubbo的RoundRobinLoadBalance耗时:9291
最新dubbo的RoundRobinLoadBalance流量分布:[33400,33400,66700,66700,100000,100000,133300,133300,166600,166600]
dubbo-2.5.3的RoundRobinLoadBalance耗时:158
dubbo-2.5.3的RoundRobinLoadBalance流量分布:[20000,20000,45000,45000,78333,78333,128333,128333,228334,228334]
dubbo-2.5.3的RandomLoadBalance耗时:39
dubbo-2.5.3的RandomLoadBalance流量分布:[33261,33660,66804,66524,99586,99774,133574,133397,166703,166717]
```

In the latest version of the RoundRobinLoadBalance implementation, a large number of for loops are used to do the minus one operation. This is not optimistic when the weight distribution is relatively large (hundreds of thousands of weights).

[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org

[GitHub] [incubator-dubbo] kimmking closed issue #2578: The current RoundRobinLoadBalance implementation performance is not satisfactory

Posted by "kimmking (GitHub)" <gi...@apache.org>.
[ issue closed by kimmking ]

[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org


[GitHub] [incubator-dubbo] jasonjoo2010 commented on issue #2578: The current RoundRobinLoadBalance implementation performance is not satisfactory

Posted by "jasonjoo2010 (GitHub)" <gi...@apache.org>.
> Hi,
> The `RoundRobinLoadBalance Algorithm` is not smoothing.
> For example, assuming that the weight of **Server A** is five, the weight of **Server B** is one, the weight of **Server C** is one.
> When calling the service seven times, the responding order is **[A ,A, A, A, A, B, C]**.
> By this responding order, we can find that the first five callings point to Server A, it will cause Server A to be overloaded in a short period of time.
> Ideally, the call order would be **[A ,A, B, A, C, A, A]**.
> So, I think there may be a better way to do this.

Hi, all

I have another implementation of SMOOTH WRR including concurrent locking, test code is below:

```java
package com.yoloho.rocketmq.consumers.demo;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.dubbo.common.utils.AtomicPositiveInteger;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;

public final class DemoDubbo {
    /**
     * 假设该接口有10个可用的Invoker
     */
    private static final int[] INVOKER_WEIGHT_ARRAY = new int[]{100, 100, 200, 200, 300, 300, 400, 400, 500, 500};

    private static final String SERVICE_KEY = "com.test.Test.testMethod";

    private static final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    private static final ConcurrentMap<String, AtomicPositiveInteger> weightSequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();


    public static void main(String[] args) {
        AtomicInteger a;
        int times = 1000000;
        int[] selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
        long start = System.nanoTime();
        sequences.clear();
        weightSequences.clear();
        while (times-- > 0) {
            int select = currentSelect();
            selectArray[select]++;
        }

        System.out.println("最新dubbo的RoundRobinLoadBalance耗时:" + (System.nanoTime() - start) / 1000000);
        System.out.println("最新dubbo的RoundRobinLoadBalance流量分布:" + JSON.toJSONString(selectArray));

        times = 1000000;
        selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
        start = System.nanoTime();
        sequences.clear();
        weightSequences.clear();
        while (times-- > 0) {
            int select = oldSelect();
            selectArray[select]++;
        }

        System.out.println("dubbo-2.5.3的RoundRobinLoadBalance耗时:" + (System.nanoTime() - start) / 1000000);
        System.out.println("dubbo-2.5.3的RoundRobinLoadBalance流量分布:" + JSON.toJSONString(selectArray));

        times = 1000000;
        selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
        start = System.nanoTime();
        sequences.clear();
        weightSequences.clear();
        while (times-- > 0) {
            int select = oldRandomSelect();
            selectArray[select]++;
        }

        System.out.println("dubbo-2.5.3的RandomLoadBalance耗时:" + (System.nanoTime() - start) / 1000000);
        System.out.println("dubbo-2.5.3的RandomLoadBalance流量分布:" + JSON.toJSONString(selectArray));
        
        
        times = 1000000;
        selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
        start = System.nanoTime();
        while (times-- > 0) {
            int select = wrrSelect();
            selectArray[select]++;
        }

        System.out.println("smooth的wrr耗时:" + (System.nanoTime() - start) / 1000000);
        System.out.println("smooth的wrr流量分布:" + JSON.toJSONString(selectArray));

    }

    /**
     * 当前最新版本dubbo master分支中实现方式
     *
     * @return 选择的invoker的index
     */
    private static int currentSelect() {
        // 为了测试方便,key默认写死
        String key = SERVICE_KEY;
        // invoker默认是10个
        int length = INVOKER_WEIGHT_ARRAY.length; // Number of invokers

        int maxWeight = 0; // The maximum weight
        int minWeight = Integer.MAX_VALUE; // The minimum weight
        final LinkedHashMap<Integer, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Integer, IntegerWrapper>();
        int weightSum = 0;
        for (int i = 0; i < length; i++) {
            int weight = getWeight(i);
            maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
            minWeight = Math.min(minWeight, weight); // Choose the minimum weight
            if (weight > 0) {
                invokerToWeightMap.put(i, new IntegerWrapper(weight));
                weightSum += weight;
            }
        }
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        int currentSequence = sequence.getAndIncrement();
        if (maxWeight > 0 && minWeight < maxWeight) {
            int mod = currentSequence % weightSum;
            for (int i = 0; i < maxWeight; i++) {
                for (Map.Entry<Integer, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                    final Integer k = each.getKey();
                    final IntegerWrapper v = each.getValue();
                    if (mod == 0 && v.getValue() > 0) {
                        return k;
                    }
                    if (v.getValue() > 0) {
                        v.decrement();
                        mod--;
                    }
                }
            }
        }
        // Round robin
        return currentSequence % length;
    }

    /**
     * 2.5.3版本的roundrobin方式
     *
     * @return
     */
    private static int oldSelect() {
        // 为了测试方便,key默认写死
        String key = SERVICE_KEY;
        // invoker默认是10个
        int length = INVOKER_WEIGHT_ARRAY.length; // Number of invokers

        List<Integer> invokers = Lists.newArrayList();

        int maxWeight = 0; // 最大权重
        int minWeight = Integer.MAX_VALUE; // 最小权重
        for (int i = 0; i < length; i++) {
            int weight = getWeight(i);
            maxWeight = Math.max(maxWeight, weight); // 累计最大权重
            minWeight = Math.min(minWeight, weight); // 累计最小权重
        }
        if (maxWeight > 0 && minWeight < maxWeight) { // 权重不一样
            AtomicPositiveInteger weightSequence = weightSequences.get(key);
            if (weightSequence == null) {
                weightSequences.putIfAbsent(key, new AtomicPositiveInteger());
                weightSequence = weightSequences.get(key);
            }
            int currentWeight = weightSequence.getAndIncrement() % maxWeight;
            List<Integer> weightInvokers = new ArrayList<Integer>();
            for (int i = 0; i < length; i++) { // 筛选权重大于当前权重基数的Invoker
                if (getWeight(i) > currentWeight) {
                    weightInvokers.add(i);
                }
            }
            int weightLength = weightInvokers.size();
            if (weightLength == 1) {
                return weightInvokers.get(0);
            } else if (weightLength > 1) {
                invokers = weightInvokers;
                length = invokers.size();
            }
        }
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        // 取模轮循
        return invokers.get(sequence.getAndIncrement() % length);
    }

    /**
     * 2.5.3版本的random方式
     *
     * @return
     */
    private static int oldRandomSelect() {
        // 为了测试方便,key默认写死
        String key = SERVICE_KEY;
        // invoker默认是10个
        int length = INVOKER_WEIGHT_ARRAY.length; // Number of invokers
        int totalWeight = 0; // 总权重

        boolean sameWeight = true; // 权重是否都一样
        for (int i = 0; i < length; i++) {
            int weight = getWeight(i);
            totalWeight += weight; // 累计总权重
            if (sameWeight && i > 0
                    && weight != getWeight(i - 1)) {
                sameWeight = false; // 计算所有权重是否一样
            }
        }
        if (totalWeight > 0 && !sameWeight) {
            // 如果权重不相同且权重大于0则按总权重数随机
            int offset = ThreadLocalRandom.current().nextInt(totalWeight);
            // 并确定随机值落在哪个片断上
            for (int i = 0; i < length; i++) {
                offset -= getWeight(i);
                if (offset < 0) {
                    return i;
                }
            }
        }
        // 如果权重相同或权重为0则均等随机
        return ThreadLocalRandom.current().nextInt(length);
    }

    private static int getWeight(int invokerIndex) {
        return INVOKER_WEIGHT_ARRAY[invokerIndex];
    }

    private static final class IntegerWrapper {
        private int value;

        public IntegerWrapper(int value) {
            this.value = value;
        }

        public int getValue() {
            return value;
        }

        public void setValue(int value) {
            this.value = value;
        }

        public void decrement() {
            this.value--;
        }
    }
    
    
    private static class WeightedRoundRobin {
        private int weight;
        private int current;
        public int getWeight() {
            return weight;
        }
        public void setWeight(int weight) {
            this.weight = weight;
        }
        public void setCurrent(int current) {
            this.current = current;
        }
        public int increaseWeight() {
            current += weight;
            return current;
        }
        public void sel(int total) {
            current -= total;
        }
    }
    
    private final static ConcurrentMap<String, Map<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, Map<String, WeightedRoundRobin>>();
    
    private static Map<String, WeightedRoundRobin> getEntry(String key) {
        Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        if (map == null) {
            methodWeightMap.putIfAbsent(key, new HashMap<String, WeightedRoundRobin>());
            map = methodWeightMap.get(key);
        }
        return map;
    }

    private static int wrrSelect() {
        // 为了测试方便,key默认写死
        String key = SERVICE_KEY;
        Map<String, WeightedRoundRobin> map = getEntry(key);
        synchronized (map) {
            int totalWeight = 0;
            int maxCurrent = Integer.MIN_VALUE;
            int selectedInvoker = -1;
            WeightedRoundRobin selectedWRR = null;
            for (int i = 0; i < INVOKER_WEIGHT_ARRAY.length; i++) {
                String addr = "server:" + i;
                WeightedRoundRobin weightedRoundRobin = map.get(addr);
                int weight = INVOKER_WEIGHT_ARRAY[i];
                if (weight < 0) {
                    weight = 0;
                }
                if (weightedRoundRobin == null) {
                    weightedRoundRobin = new WeightedRoundRobin();
                    weightedRoundRobin.setCurrent(0);
                    weightedRoundRobin.setWeight(weight);
                    map.put(addr, weightedRoundRobin);
                }
                if (weight != weightedRoundRobin.getWeight()) {
                    //weight changed
                    weightedRoundRobin.setCurrent(0);
                    weightedRoundRobin.setWeight(weight);
                }
                int cur = weightedRoundRobin.increaseWeight();
                if (cur > maxCurrent) {
                    maxCurrent = cur;
                    selectedInvoker = i;
                    selectedWRR = weightedRoundRobin;
                }
                totalWeight += weight;
            }
            if (selectedInvoker >= 0) {
                selectedWRR.sel(totalWeight);
                return selectedInvoker;
            }
        }
        return -1;
    }
}
```

Result:

```
最新dubbo的RoundRobinLoadBalance耗时:8975
最新dubbo的RoundRobinLoadBalance流量分布:[33400,33400,66700,66700,100000,100000,133300,133300,166600,166600]
dubbo-2.5.3的RoundRobinLoadBalance耗时:152
dubbo-2.5.3的RoundRobinLoadBalance流量分布:[20000,20000,45000,45000,78333,78333,128333,128333,228334,228334]
dubbo-2.5.3的RandomLoadBalance耗时:45
dubbo-2.5.3的RandomLoadBalance流量分布:[33279,33289,67213,66143,99651,99970,133518,133463,167300,166174]
smooth的wrr耗时:390
smooth的wrr流量分布:[33333,33333,66667,66667,100000,100000,133333,133333,166667,166667]
```

Is it acceptable?

Time cost: **390** ms
Code from nginx source code.
And a cleaner block should be added to clean offline nodes in map(typically not performance sensitive).

[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org

[GitHub] [incubator-dubbo] kimmking commented on issue #2578: The current RoundRobinLoadBalance implementation performance is not satisfactory

Posted by "kimmking (GitHub)" <gi...@apache.org>.
I will check it today

[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org


[GitHub] [incubator-dubbo] kimmking commented on issue #2578: The current RoundRobinLoadBalance implementation performance is not satisfactory

Posted by "kimmking (GitHub)" <gi...@apache.org>.
If we modify a weight to 50000 :
```
private static final int[] INVOKER_WEIGHT_ARRAY = new int[]{100, 100, 200, 200, 300, 300, 400, 400, 500, 50000};
```

then it need 10minutes to finish select.


[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org


[GitHub] [incubator-dubbo] kimmking commented on issue #2578: The current RoundRobinLoadBalance implementation performance is not satisfactory

Posted by "kimmking (GitHub)" <gi...@apache.org>.
The second code snippet works well.
@gudegg you can submit a PR for it, I will submit my test case.

[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org


[GitHub] [incubator-dubbo] kimmking commented on issue #2578: The current RoundRobinLoadBalance implementation performance is not satisfactory

Posted by "kimmking (GitHub)" <gi...@apache.org>.
it's good way.

[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org


[GitHub] [incubator-dubbo] kimmking commented on issue #2578: The current RoundRobinLoadBalance implementation performance is not satisfactory

Posted by "kimmking (GitHub)" <gi...@apache.org>.
In your case 1:
Invoking currentSelect-method 1M times lead to 1833832000 times calc, cause:
```
for (int i = 0; i < maxWeight; i++) { // this is the reason
     for (Map.Entry<Integer, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                    ....
     }
}
```

If we modify the weight array to {100, 100, 200, 200, 300, 300, 400, 400, 500, 5000};
It need 60+s to finish it.

So we must optimize this RR algorithm.

[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org


[GitHub] [incubator-dubbo] kimmking commented on issue #2578: The current RoundRobinLoadBalance implementation performance is not satisfactory

Posted by "kimmking (GitHub)" <gi...@apache.org>.
In your case 1:
Invoking currentSelect-method 1M times lead to 1833832000 times calc, cause:
```
for (int i = 0; i < maxWeight; i++) { // this is the reason
     for (Map.Entry<Integer, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                    ....
     }
}
```

If we modify the weight array to {100, 100, 200, 200, 300, 300, 400, 400, 500, 50000};
It need 900s to finish it.

So we must optimize this RR algorithm.

[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org


[GitHub] [incubator-dubbo] jasonjoo2010 commented on issue #2578: The current RoundRobinLoadBalance implementation performance is not satisfactory

Posted by "jasonjoo2010 (GitHub)" <gi...@apache.org>.
> Hi,
> The `RoundRobinLoadBalance Algorithm` is not smoothing.
> For example, assuming that the weight of **Server A** is five, the weight of **Server B** is one, the weight of **Server C** is one.
> When calling the service seven times, the responding order is **[A ,A, A, A, A, B, C]**.
> By this responding order, we can find that the first five callings point to Server A, it will cause Server A to be overloaded in a short period of time.
> Ideally, the call order would be **[A ,A, B, A, C, A, A]**.
> So, I think there may be a better way to do this.

Hi, all

I have another implementation of SMOOTH WRR including concurrent locking, test code is below:

```java
package com.yoloho.rocketmq.consumers.demo;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.dubbo.common.utils.AtomicPositiveInteger;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;

public final class DemoDubbo {
    /**
     * 假设该接口有10个可用的Invoker
     */
    private static final int[] INVOKER_WEIGHT_ARRAY = new int[]{100, 100, 200, 200, 300, 300, 400, 400, 500, 500};

    private static final String SERVICE_KEY = "com.test.Test.testMethod";

    private static final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    private static final ConcurrentMap<String, AtomicPositiveInteger> weightSequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();


    public static void main(String[] args) {
        AtomicInteger a;
        int times = 1000000;
        int[] selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
        long start = System.nanoTime();
        sequences.clear();
        weightSequences.clear();
        while (times-- > 0) {
            int select = currentSelect();
            selectArray[select]++;
        }

        System.out.println("最新dubbo的RoundRobinLoadBalance耗时:" + (System.nanoTime() - start) / 1000000);
        System.out.println("最新dubbo的RoundRobinLoadBalance流量分布:" + JSON.toJSONString(selectArray));

        times = 1000000;
        selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
        start = System.nanoTime();
        sequences.clear();
        weightSequences.clear();
        while (times-- > 0) {
            int select = oldSelect();
            selectArray[select]++;
        }

        System.out.println("dubbo-2.5.3的RoundRobinLoadBalance耗时:" + (System.nanoTime() - start) / 1000000);
        System.out.println("dubbo-2.5.3的RoundRobinLoadBalance流量分布:" + JSON.toJSONString(selectArray));

        times = 1000000;
        selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
        start = System.nanoTime();
        sequences.clear();
        weightSequences.clear();
        while (times-- > 0) {
            int select = oldRandomSelect();
            selectArray[select]++;
        }

        System.out.println("dubbo-2.5.3的RandomLoadBalance耗时:" + (System.nanoTime() - start) / 1000000);
        System.out.println("dubbo-2.5.3的RandomLoadBalance流量分布:" + JSON.toJSONString(selectArray));
        
        
        times = 1000000;
        selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
        start = System.nanoTime();
        while (times-- > 0) {
            int select = wrrSelect();
            selectArray[select]++;
        }

        System.out.println("smooth的wrr耗时:" + (System.nanoTime() - start) / 1000000);
        System.out.println("smooth的wrr流量分布:" + JSON.toJSONString(selectArray));

    }

    /**
     * 当前最新版本dubbo master分支中实现方式
     *
     * @return 选择的invoker的index
     */
    private static int currentSelect() {
        // 为了测试方便,key默认写死
        String key = SERVICE_KEY;
        // invoker默认是10个
        int length = INVOKER_WEIGHT_ARRAY.length; // Number of invokers

        int maxWeight = 0; // The maximum weight
        int minWeight = Integer.MAX_VALUE; // The minimum weight
        final LinkedHashMap<Integer, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Integer, IntegerWrapper>();
        int weightSum = 0;
        for (int i = 0; i < length; i++) {
            int weight = getWeight(i);
            maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
            minWeight = Math.min(minWeight, weight); // Choose the minimum weight
            if (weight > 0) {
                invokerToWeightMap.put(i, new IntegerWrapper(weight));
                weightSum += weight;
            }
        }
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        int currentSequence = sequence.getAndIncrement();
        if (maxWeight > 0 && minWeight < maxWeight) {
            int mod = currentSequence % weightSum;
            for (int i = 0; i < maxWeight; i++) {
                for (Map.Entry<Integer, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                    final Integer k = each.getKey();
                    final IntegerWrapper v = each.getValue();
                    if (mod == 0 && v.getValue() > 0) {
                        return k;
                    }
                    if (v.getValue() > 0) {
                        v.decrement();
                        mod--;
                    }
                }
            }
        }
        // Round robin
        return currentSequence % length;
    }

    /**
     * 2.5.3版本的roundrobin方式
     *
     * @return
     */
    private static int oldSelect() {
        // 为了测试方便,key默认写死
        String key = SERVICE_KEY;
        // invoker默认是10个
        int length = INVOKER_WEIGHT_ARRAY.length; // Number of invokers

        List<Integer> invokers = Lists.newArrayList();

        int maxWeight = 0; // 最大权重
        int minWeight = Integer.MAX_VALUE; // 最小权重
        for (int i = 0; i < length; i++) {
            int weight = getWeight(i);
            maxWeight = Math.max(maxWeight, weight); // 累计最大权重
            minWeight = Math.min(minWeight, weight); // 累计最小权重
        }
        if (maxWeight > 0 && minWeight < maxWeight) { // 权重不一样
            AtomicPositiveInteger weightSequence = weightSequences.get(key);
            if (weightSequence == null) {
                weightSequences.putIfAbsent(key, new AtomicPositiveInteger());
                weightSequence = weightSequences.get(key);
            }
            int currentWeight = weightSequence.getAndIncrement() % maxWeight;
            List<Integer> weightInvokers = new ArrayList<Integer>();
            for (int i = 0; i < length; i++) { // 筛选权重大于当前权重基数的Invoker
                if (getWeight(i) > currentWeight) {
                    weightInvokers.add(i);
                }
            }
            int weightLength = weightInvokers.size();
            if (weightLength == 1) {
                return weightInvokers.get(0);
            } else if (weightLength > 1) {
                invokers = weightInvokers;
                length = invokers.size();
            }
        }
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        // 取模轮循
        return invokers.get(sequence.getAndIncrement() % length);
    }

    /**
     * 2.5.3版本的random方式
     *
     * @return
     */
    private static int oldRandomSelect() {
        // 为了测试方便,key默认写死
        String key = SERVICE_KEY;
        // invoker默认是10个
        int length = INVOKER_WEIGHT_ARRAY.length; // Number of invokers
        int totalWeight = 0; // 总权重

        boolean sameWeight = true; // 权重是否都一样
        for (int i = 0; i < length; i++) {
            int weight = getWeight(i);
            totalWeight += weight; // 累计总权重
            if (sameWeight && i > 0
                    && weight != getWeight(i - 1)) {
                sameWeight = false; // 计算所有权重是否一样
            }
        }
        if (totalWeight > 0 && !sameWeight) {
            // 如果权重不相同且权重大于0则按总权重数随机
            int offset = ThreadLocalRandom.current().nextInt(totalWeight);
            // 并确定随机值落在哪个片断上
            for (int i = 0; i < length; i++) {
                offset -= getWeight(i);
                if (offset < 0) {
                    return i;
                }
            }
        }
        // 如果权重相同或权重为0则均等随机
        return ThreadLocalRandom.current().nextInt(length);
    }

    private static int getWeight(int invokerIndex) {
        return INVOKER_WEIGHT_ARRAY[invokerIndex];
    }

    private static final class IntegerWrapper {
        private int value;

        public IntegerWrapper(int value) {
            this.value = value;
        }

        public int getValue() {
            return value;
        }

        public void setValue(int value) {
            this.value = value;
        }

        public void decrement() {
            this.value--;
        }
    }
    
    
    private static class WeightedRoundRobin {
        private int weight;
        private int current;
        public int getWeight() {
            return weight;
        }
        public void setWeight(int weight) {
            this.weight = weight;
        }
        public void setCurrent(int current) {
            this.current = current;
        }
        public int increaseWeight() {
            current += weight;
            return current;
        }
        public void sel(int total) {
            current -= total;
        }
    }
    
    private final static ConcurrentMap<String, Map<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, Map<String, WeightedRoundRobin>>();
    
    private static Map<String, WeightedRoundRobin> getEntry(String key) {
        Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        if (map == null) {
            methodWeightMap.putIfAbsent(key, new HashMap<String, WeightedRoundRobin>());
            map = methodWeightMap.get(key);
        }
        return map;
    }

    private static int wrrSelect() {
        // 为了测试方便,key默认写死
        String key = SERVICE_KEY;
        Map<String, WeightedRoundRobin> map = getEntry(key);
        synchronized (map) {
            int totalWeight = 0;
            int maxCurrent = Integer.MIN_VALUE;
            int selectedInvoker = -1;
            WeightedRoundRobin selectedWRR = null;
            for (int i = 0; i < INVOKER_WEIGHT_ARRAY.length; i++) {
                String addr = "server:" + i;
                WeightedRoundRobin weightedRoundRobin = map.get(addr);
                int weight = INVOKER_WEIGHT_ARRAY[i];
                if (weight < 0) {
                    weight = 0;
                }
                if (weightedRoundRobin == null) {
                    weightedRoundRobin = new WeightedRoundRobin();
                    weightedRoundRobin.setCurrent(0);
                    weightedRoundRobin.setWeight(weight);
                    map.put(addr, weightedRoundRobin);
                }
                if (weight != weightedRoundRobin.getWeight()) {
                    //weight changed
                    weightedRoundRobin.setCurrent(0);
                    weightedRoundRobin.setWeight(weight);
                }
                int cur = weightedRoundRobin.increaseWeight();
                if (cur > maxCurrent) {
                    maxCurrent = cur;
                    selectedInvoker = i;
                    selectedWRR = weightedRoundRobin;
                }
                totalWeight += weight;
            }
            if (selectedInvoker >= 0) {
                selectedWRR.sel(totalWeight);
                return selectedInvoker;
            }
        }
        return -1;
    }
}
```

Result:

```
最新dubbo的RoundRobinLoadBalance耗时:8975
最新dubbo的RoundRobinLoadBalance流量分布:[33400,33400,66700,66700,100000,100000,133300,133300,166600,166600]
dubbo-2.5.3的RoundRobinLoadBalance耗时:152
dubbo-2.5.3的RoundRobinLoadBalance流量分布:[20000,20000,45000,45000,78333,78333,128333,128333,228334,228334]
dubbo-2.5.3的RandomLoadBalance耗时:45
dubbo-2.5.3的RandomLoadBalance流量分布:[33279,33289,67213,66143,99651,99970,133518,133463,167300,166174]
smooth的wrr耗时:390
smooth的wrr流量分布:[33333,33333,66667,66667,100000,100000,133333,133333,166667,166667]
```

Is it acceptable?

[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org

[GitHub] [incubator-dubbo] gudegg commented on issue #2578: The current RoundRobinLoadBalance implementation performance is not satisfactory

Posted by "gudegg (GitHub)" <gi...@apache.org>.
@kimmking The above implementation is also very bad when the greatest common divisor is 1. This performance will be much better. Help me to see if there are any problems. thx!
```java
public class RoundRobinLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "roundrobin";

    private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    private final ConcurrentMap<String, AtomicPositiveInteger> indexSeqs = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int length = invokers.size(); // Number of invokers
        int maxWeight = 0; // The maximum weight
        int minWeight = Integer.MAX_VALUE; // The minimum weight
        final List<Invoker<T>> invokerToWeightList = new ArrayList<>();
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
            minWeight = Math.min(minWeight, weight); // Choose the minimum weight
            if (weight > 0) {
                invokerToWeightList.add(invokers.get(i));
            }
        }
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        AtomicPositiveInteger indexSeq = indexSeqs.get(key);
        if (indexSeq == null) {
            indexSeqs.putIfAbsent(key, new AtomicPositiveInteger(-1));
            indexSeq = indexSeqs.get(key);
        }

        if (maxWeight > 0 && minWeight < maxWeight) {
            length = invokerToWeightList.size();
            while (true) {
                int index = indexSeq.incrementAndGet() % length;
                int currentWeight = sequence.get() % maxWeight;
                if (index == 0) {
                    currentWeight = sequence.incrementAndGet() % maxWeight;
                }
                if (getWeight(invokerToWeightList.get(index), invocation) > currentWeight) {
                    return invokerToWeightList.get(index);
                }
            }
        }
        // Round robin
        return invokers.get(sequence.incrementAndGet() % length);
    }
}
```

[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org


[GitHub] [incubator-dubbo] shuaijunlan commented on issue #2578: The current RoundRobinLoadBalance implementation performance is not satisfactory

Posted by "shuaijunlan (GitHub)" <gi...@apache.org>.
Hi, 
The `RoundRobinLoadBalance Algorithm` is not smoothing.
For example, assuming that the weight of **Server A** is five, the weight of **Server B** is one, the weight of **Server C** is one.
When calling the service seven times, the responding order is **[A ,A, A, A, A, B, C]**.
By this responding order, we can find that the first five callings point to Server A, it will cause Server A to be overloaded in a short period of time.
Ideally, the call order would be  **[A ,A, B, A, C, A, A]**.
So, I think there may be a better way to do this.

[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org


[GitHub] [incubator-dubbo] jasonjoo2010 commented on issue #2578: The current RoundRobinLoadBalance implementation performance is not satisfactory

Posted by "jasonjoo2010 (GitHub)" <gi...@apache.org>.
> Hi,
> The `RoundRobinLoadBalance Algorithm` is not smoothing.
> For example, assuming that the weight of **Server A** is five, the weight of **Server B** is one, the weight of **Server C** is one.
> When calling the service seven times, the responding order is **[A ,A, A, A, A, B, C]**.
> By this responding order, we can find that the first five callings point to Server A, it will cause Server A to be overloaded in a short period of time.
> Ideally, the call order would be **[A ,A, B, A, C, A, A]**.
> So, I think there may be a better way to do this.

Hi, all

I have another implementation of SMOOTH WRR including concurrent locking.

What is `SMOOTH`:
A:B:C = 1:2:3
[A, B, C, B, C, C,      A, B, C, B, C, C,        A, B, C, B, C, C,           A, B, C, B, C, C, ......]

And it's more smooth especially the factor number is large as: [90 : 100 : 111]


test code is below:

```java
package com.yoloho.rocketmq.consumers.demo;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.dubbo.common.utils.AtomicPositiveInteger;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;

public final class DemoDubbo {
    /**
     * 假设该接口有10个可用的Invoker
     */
    private static final int[] INVOKER_WEIGHT_ARRAY = new int[]{100, 100, 200, 200, 300, 300, 400, 400, 500, 500};

    private static final String SERVICE_KEY = "com.test.Test.testMethod";

    private static final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    private static final ConcurrentMap<String, AtomicPositiveInteger> weightSequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();


    public static void main(String[] args) {
        AtomicInteger a;
        int times = 1000000;
        int[] selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
        long start = System.nanoTime();
        sequences.clear();
        weightSequences.clear();
        while (times-- > 0) {
            int select = currentSelect();
            selectArray[select]++;
        }

        System.out.println("最新dubbo的RoundRobinLoadBalance耗时:" + (System.nanoTime() - start) / 1000000);
        System.out.println("最新dubbo的RoundRobinLoadBalance流量分布:" + JSON.toJSONString(selectArray));

        times = 1000000;
        selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
        start = System.nanoTime();
        sequences.clear();
        weightSequences.clear();
        while (times-- > 0) {
            int select = oldSelect();
            selectArray[select]++;
        }

        System.out.println("dubbo-2.5.3的RoundRobinLoadBalance耗时:" + (System.nanoTime() - start) / 1000000);
        System.out.println("dubbo-2.5.3的RoundRobinLoadBalance流量分布:" + JSON.toJSONString(selectArray));

        times = 1000000;
        selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
        start = System.nanoTime();
        sequences.clear();
        weightSequences.clear();
        while (times-- > 0) {
            int select = oldRandomSelect();
            selectArray[select]++;
        }

        System.out.println("dubbo-2.5.3的RandomLoadBalance耗时:" + (System.nanoTime() - start) / 1000000);
        System.out.println("dubbo-2.5.3的RandomLoadBalance流量分布:" + JSON.toJSONString(selectArray));
        
        
        times = 1000000;
        selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
        start = System.nanoTime();
        while (times-- > 0) {
            int select = wrrSelect();
            selectArray[select]++;
        }

        System.out.println("smooth的wrr耗时:" + (System.nanoTime() - start) / 1000000);
        System.out.println("smooth的wrr流量分布:" + JSON.toJSONString(selectArray));

    }

    /**
     * 当前最新版本dubbo master分支中实现方式
     *
     * @return 选择的invoker的index
     */
    private static int currentSelect() {
        // 为了测试方便,key默认写死
        String key = SERVICE_KEY;
        // invoker默认是10个
        int length = INVOKER_WEIGHT_ARRAY.length; // Number of invokers

        int maxWeight = 0; // The maximum weight
        int minWeight = Integer.MAX_VALUE; // The minimum weight
        final LinkedHashMap<Integer, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Integer, IntegerWrapper>();
        int weightSum = 0;
        for (int i = 0; i < length; i++) {
            int weight = getWeight(i);
            maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
            minWeight = Math.min(minWeight, weight); // Choose the minimum weight
            if (weight > 0) {
                invokerToWeightMap.put(i, new IntegerWrapper(weight));
                weightSum += weight;
            }
        }
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        int currentSequence = sequence.getAndIncrement();
        if (maxWeight > 0 && minWeight < maxWeight) {
            int mod = currentSequence % weightSum;
            for (int i = 0; i < maxWeight; i++) {
                for (Map.Entry<Integer, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                    final Integer k = each.getKey();
                    final IntegerWrapper v = each.getValue();
                    if (mod == 0 && v.getValue() > 0) {
                        return k;
                    }
                    if (v.getValue() > 0) {
                        v.decrement();
                        mod--;
                    }
                }
            }
        }
        // Round robin
        return currentSequence % length;
    }

    /**
     * 2.5.3版本的roundrobin方式
     *
     * @return
     */
    private static int oldSelect() {
        // 为了测试方便,key默认写死
        String key = SERVICE_KEY;
        // invoker默认是10个
        int length = INVOKER_WEIGHT_ARRAY.length; // Number of invokers

        List<Integer> invokers = Lists.newArrayList();

        int maxWeight = 0; // 最大权重
        int minWeight = Integer.MAX_VALUE; // 最小权重
        for (int i = 0; i < length; i++) {
            int weight = getWeight(i);
            maxWeight = Math.max(maxWeight, weight); // 累计最大权重
            minWeight = Math.min(minWeight, weight); // 累计最小权重
        }
        if (maxWeight > 0 && minWeight < maxWeight) { // 权重不一样
            AtomicPositiveInteger weightSequence = weightSequences.get(key);
            if (weightSequence == null) {
                weightSequences.putIfAbsent(key, new AtomicPositiveInteger());
                weightSequence = weightSequences.get(key);
            }
            int currentWeight = weightSequence.getAndIncrement() % maxWeight;
            List<Integer> weightInvokers = new ArrayList<Integer>();
            for (int i = 0; i < length; i++) { // 筛选权重大于当前权重基数的Invoker
                if (getWeight(i) > currentWeight) {
                    weightInvokers.add(i);
                }
            }
            int weightLength = weightInvokers.size();
            if (weightLength == 1) {
                return weightInvokers.get(0);
            } else if (weightLength > 1) {
                invokers = weightInvokers;
                length = invokers.size();
            }
        }
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        // 取模轮循
        return invokers.get(sequence.getAndIncrement() % length);
    }

    /**
     * 2.5.3版本的random方式
     *
     * @return
     */
    private static int oldRandomSelect() {
        // 为了测试方便,key默认写死
        String key = SERVICE_KEY;
        // invoker默认是10个
        int length = INVOKER_WEIGHT_ARRAY.length; // Number of invokers
        int totalWeight = 0; // 总权重

        boolean sameWeight = true; // 权重是否都一样
        for (int i = 0; i < length; i++) {
            int weight = getWeight(i);
            totalWeight += weight; // 累计总权重
            if (sameWeight && i > 0
                    && weight != getWeight(i - 1)) {
                sameWeight = false; // 计算所有权重是否一样
            }
        }
        if (totalWeight > 0 && !sameWeight) {
            // 如果权重不相同且权重大于0则按总权重数随机
            int offset = ThreadLocalRandom.current().nextInt(totalWeight);
            // 并确定随机值落在哪个片断上
            for (int i = 0; i < length; i++) {
                offset -= getWeight(i);
                if (offset < 0) {
                    return i;
                }
            }
        }
        // 如果权重相同或权重为0则均等随机
        return ThreadLocalRandom.current().nextInt(length);
    }

    private static int getWeight(int invokerIndex) {
        return INVOKER_WEIGHT_ARRAY[invokerIndex];
    }

    private static final class IntegerWrapper {
        private int value;

        public IntegerWrapper(int value) {
            this.value = value;
        }

        public int getValue() {
            return value;
        }

        public void setValue(int value) {
            this.value = value;
        }

        public void decrement() {
            this.value--;
        }
    }
    
    
    private static class WeightedRoundRobin {
        private int weight;
        private int current;
        public int getWeight() {
            return weight;
        }
        public void setWeight(int weight) {
            this.weight = weight;
        }
        public void setCurrent(int current) {
            this.current = current;
        }
        public int increaseWeight() {
            current += weight;
            return current;
        }
        public void sel(int total) {
            current -= total;
        }
    }
    
    private final static ConcurrentMap<String, Map<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, Map<String, WeightedRoundRobin>>();
    
    private static Map<String, WeightedRoundRobin> getEntry(String key) {
        Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        if (map == null) {
            methodWeightMap.putIfAbsent(key, new HashMap<String, WeightedRoundRobin>());
            map = methodWeightMap.get(key);
        }
        return map;
    }

    private static int wrrSelect() {
        // 为了测试方便,key默认写死
        String key = SERVICE_KEY;
        Map<String, WeightedRoundRobin> map = getEntry(key);
        synchronized (map) {
            int totalWeight = 0;
            int maxCurrent = Integer.MIN_VALUE;
            int selectedInvoker = -1;
            WeightedRoundRobin selectedWRR = null;
            for (int i = 0; i < INVOKER_WEIGHT_ARRAY.length; i++) {
                String addr = "server:" + i;
                WeightedRoundRobin weightedRoundRobin = map.get(addr);
                int weight = INVOKER_WEIGHT_ARRAY[i];
                if (weight < 0) {
                    weight = 0;
                }
                if (weightedRoundRobin == null) {
                    weightedRoundRobin = new WeightedRoundRobin();
                    weightedRoundRobin.setCurrent(0);
                    weightedRoundRobin.setWeight(weight);
                    map.put(addr, weightedRoundRobin);
                }
                if (weight != weightedRoundRobin.getWeight()) {
                    //weight changed
                    weightedRoundRobin.setCurrent(0);
                    weightedRoundRobin.setWeight(weight);
                }
                int cur = weightedRoundRobin.increaseWeight();
                if (cur > maxCurrent) {
                    maxCurrent = cur;
                    selectedInvoker = i;
                    selectedWRR = weightedRoundRobin;
                }
                totalWeight += weight;
            }
            if (selectedInvoker >= 0) {
                selectedWRR.sel(totalWeight);
                return selectedInvoker;
            }
        }
        return -1;
    }
}
```

Result:

```
最新dubbo的RoundRobinLoadBalance耗时:8975
最新dubbo的RoundRobinLoadBalance流量分布:[33400,33400,66700,66700,100000,100000,133300,133300,166600,166600]
dubbo-2.5.3的RoundRobinLoadBalance耗时:152
dubbo-2.5.3的RoundRobinLoadBalance流量分布:[20000,20000,45000,45000,78333,78333,128333,128333,228334,228334]
dubbo-2.5.3的RandomLoadBalance耗时:45
dubbo-2.5.3的RandomLoadBalance流量分布:[33279,33289,67213,66143,99651,99970,133518,133463,167300,166174]
smooth的wrr耗时:390
smooth的wrr流量分布:[33333,33333,66667,66667,100000,100000,133333,133333,166667,166667]
```

Is it acceptable?

Time cost: **390** ms
Code from nginx source code.
And a cleaner block should be added to clean offline nodes in map(typically not performance sensitive).

[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org

[GitHub] [incubator-dubbo] zonghaishang commented on issue #2578: The current RoundRobinLoadBalance implementation performance is not satisfactory

Posted by "zonghaishang (GitHub)" <gi...@apache.org>.
At present, it seems that only a small number of weights can be configured to avoid this problem.


[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org


[GitHub] [incubator-dubbo] kimmking commented on issue #2578: The current RoundRobinLoadBalance implementation performance is not satisfactory

Posted by "kimmking (GitHub)" <gi...@apache.org>.
If we modify a weight to 50000 :
```
private static final int[] INVOKER_WEIGHT_ARRAY = new int[]
  {100, 100, 200, 200, 300, 300, 400, 400, 500, 50000};
```

then it need 10minutes to finish select.


[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org


[GitHub] [incubator-dubbo] gudegg commented on issue #2578: The current RoundRobinLoadBalance implementation performance is not satisfactory

Posted by "gudegg (GitHub)" <gi...@apache.org>.
@kimmking find the greatest common divisor first?
```java
public class RoundRobinLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "roundrobin";

    private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int length = invokers.size(); // Number of invokers
        int maxWeight = 0; // The maximum weight
        int minWeight = Integer.MAX_VALUE; // The minimum weight
        final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
        int weightSum = 0;
        int gcd = 0;
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
            minWeight = Math.min(minWeight, weight); // Choose the minimum weight
            if (weight > 0) {
                invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
                weightSum += weight;
                if (gcd != 1) {
                    gcd = gcd(weight, gcd);
                }
            }
        }
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        int currentSequence = sequence.getAndIncrement();
        if (maxWeight > 0 && minWeight < maxWeight) {
            int mod = currentSequence % (weightSum / gcd);
            for (int i = 0; i < maxWeight / gcd; i++) {
                for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                    final Invoker<T> k = each.getKey();
                    final IntegerWrapper v = each.getValue();
                    if (mod == 0 && v.getValue() > 0) {
                        return k;
                    }
                    if (v.getValue() > 0) {
                        v.decrement(gcd);
                        mod--;
                    }
                }
            }
        }
        // Round robin
        return invokers.get(currentSequence % length);
    }

    private static final class IntegerWrapper {
        private int value;

        public IntegerWrapper(int value) {
            this.value = value;
        }

        public int getValue() {
            return value;
        }

        public void setValue(int value) {
            this.value = value;
        }

        public void decrement() {
            this.value--;
        }

        public void decrement(int a) {
            this.value -= a;
        }
    }

    /**
     * greatest common divisor
     */
    public static int gcd(int number1, int number2) {
        if (number2 == 0) {
            return number1;
        }
        while (true) {
            if ((number1 = number1 % number2) == 0) {
                return number2;
            }
            if ((number2 = number2 % number1) == 0) {
                return number1;
            }
        }
    }

}
```

[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org


[GitHub] [incubator-dubbo] gudegg commented on issue #2578: The current RoundRobinLoadBalance implementation performance is not satisfactory

Posted by "gudegg (GitHub)" <gi...@apache.org>.
@kimmking find the greatest common divisor first?
```java
public class RoundRobinLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "roundrobin";

    private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int length = invokers.size(); // Number of invokers
        int maxWeight = 0; // The maximum weight
        int minWeight = Integer.MAX_VALUE; // The minimum weight
        final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
        int weightSum = 0;
        int gcd = 0;
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            if (gcd != 1) {
                gcd = gcd(weight, gcd);
            }
            maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
            minWeight = Math.min(minWeight, weight); // Choose the minimum weight
            if (weight > 0) {
                invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
                weightSum += weight;
            }
        }
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        int currentSequence = sequence.getAndIncrement();
        if (maxWeight > 0 && minWeight < maxWeight) {
            int mod = currentSequence % (weightSum / gcd);
            for (int i = 0; i < maxWeight / gcd; i++) {
                for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                    final Invoker<T> k = each.getKey();
                    final IntegerWrapper v = each.getValue();
                    if (mod == 0 && v.getValue() > 0) {
                        return k;
                    }
                    if (v.getValue() > 0) {
                        v.decrement(gcd);
                        mod--;
                    }
                }
            }
        }
        // Round robin
        return invokers.get(currentSequence % length);
    }

    private static final class IntegerWrapper {
        private int value;

        public IntegerWrapper(int value) {
            this.value = value;
        }

        public int getValue() {
            return value;
        }

        public void setValue(int value) {
            this.value = value;
        }

        public void decrement() {
            this.value--;
        }

        public void decrement(int a) {
            this.value -= a;
        }
    }

    /**
     * greatest common divisor
     */
    public static int gcd(int number1, int number2) {
        if (number2 == 0) {
            return number1;
        }
        while (true) {
            if ((number1 = number1 % number2) == 0) {
                return number2;
            }
            if ((number2 = number2 % number1) == 0) {
                return number1;
            }
        }
    }

}
```

[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org


[GitHub] [incubator-dubbo] gudegg commented on issue #2578: The current RoundRobinLoadBalance implementation performance is not satisfactory

Posted by "gudegg (GitHub)" <gi...@apache.org>.
@kimmking OK

[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org