You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by "jasonjoo2010 (GitHub)" <gi...@apache.org> on 2018/10/13 11:33:44 UTC
[GitHub] [incubator-dubbo] jasonjoo2010 commented on issue #2578: The
current RoundRobinLoadBalance implementation performance is not satisfactory
> Hi,
> The `RoundRobinLoadBalance Algorithm` is not smoothing.
> For example, assuming that the weight of **Server A** is five, the weight of **Server B** is one, the weight of **Server C** is one.
> When calling the service seven times, the responding order is **[A ,A, A, A, A, B, C]**.
> By this responding order, we can find that the first five callings point to Server A, it will cause Server A to be overloaded in a short period of time.
> Ideally, the call order would be **[A ,A, B, A, C, A, A]**.
> So, I think there may be a better way to do this.
Hi, all
I have another implementation of SMOOTH WRR including concurrent locking, test code is below:
```java
package com.yoloho.rocketmq.consumers.demo;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import com.alibaba.dubbo.common.utils.AtomicPositiveInteger;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
public final class DemoDubbo {
/**
* 假设该接口有10个可用的Invoker
*/
private static final int[] INVOKER_WEIGHT_ARRAY = new int[]{100, 100, 200, 200, 300, 300, 400, 400, 500, 500};
private static final String SERVICE_KEY = "com.test.Test.testMethod";
private static final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
private static final ConcurrentMap<String, AtomicPositiveInteger> weightSequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
public static void main(String[] args) {
AtomicInteger a;
int times = 1000000;
int[] selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
long start = System.nanoTime();
sequences.clear();
weightSequences.clear();
while (times-- > 0) {
int select = currentSelect();
selectArray[select]++;
}
System.out.println("最新dubbo的RoundRobinLoadBalance耗时:" + (System.nanoTime() - start) / 1000000);
System.out.println("最新dubbo的RoundRobinLoadBalance流量分布:" + JSON.toJSONString(selectArray));
times = 1000000;
selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
start = System.nanoTime();
sequences.clear();
weightSequences.clear();
while (times-- > 0) {
int select = oldSelect();
selectArray[select]++;
}
System.out.println("dubbo-2.5.3的RoundRobinLoadBalance耗时:" + (System.nanoTime() - start) / 1000000);
System.out.println("dubbo-2.5.3的RoundRobinLoadBalance流量分布:" + JSON.toJSONString(selectArray));
times = 1000000;
selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
start = System.nanoTime();
sequences.clear();
weightSequences.clear();
while (times-- > 0) {
int select = oldRandomSelect();
selectArray[select]++;
}
System.out.println("dubbo-2.5.3的RandomLoadBalance耗时:" + (System.nanoTime() - start) / 1000000);
System.out.println("dubbo-2.5.3的RandomLoadBalance流量分布:" + JSON.toJSONString(selectArray));
times = 1000000;
selectArray = new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
start = System.nanoTime();
while (times-- > 0) {
int select = wrrSelect();
selectArray[select]++;
}
System.out.println("smooth的wrr耗时:" + (System.nanoTime() - start) / 1000000);
System.out.println("smooth的wrr流量分布:" + JSON.toJSONString(selectArray));
}
/**
* 当前最新版本dubbo master分支中实现方式
*
* @return 选择的invoker的index
*/
private static int currentSelect() {
// 为了测试方便,key默认写死
String key = SERVICE_KEY;
// invoker默认是10个
int length = INVOKER_WEIGHT_ARRAY.length; // Number of invokers
int maxWeight = 0; // The maximum weight
int minWeight = Integer.MAX_VALUE; // The minimum weight
final LinkedHashMap<Integer, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Integer, IntegerWrapper>();
int weightSum = 0;
for (int i = 0; i < length; i++) {
int weight = getWeight(i);
maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
minWeight = Math.min(minWeight, weight); // Choose the minimum weight
if (weight > 0) {
invokerToWeightMap.put(i, new IntegerWrapper(weight));
weightSum += weight;
}
}
AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
sequences.putIfAbsent(key, new AtomicPositiveInteger());
sequence = sequences.get(key);
}
int currentSequence = sequence.getAndIncrement();
if (maxWeight > 0 && minWeight < maxWeight) {
int mod = currentSequence % weightSum;
for (int i = 0; i < maxWeight; i++) {
for (Map.Entry<Integer, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
final Integer k = each.getKey();
final IntegerWrapper v = each.getValue();
if (mod == 0 && v.getValue() > 0) {
return k;
}
if (v.getValue() > 0) {
v.decrement();
mod--;
}
}
}
}
// Round robin
return currentSequence % length;
}
/**
* 2.5.3版本的roundrobin方式
*
* @return
*/
private static int oldSelect() {
// 为了测试方便,key默认写死
String key = SERVICE_KEY;
// invoker默认是10个
int length = INVOKER_WEIGHT_ARRAY.length; // Number of invokers
List<Integer> invokers = Lists.newArrayList();
int maxWeight = 0; // 最大权重
int minWeight = Integer.MAX_VALUE; // 最小权重
for (int i = 0; i < length; i++) {
int weight = getWeight(i);
maxWeight = Math.max(maxWeight, weight); // 累计最大权重
minWeight = Math.min(minWeight, weight); // 累计最小权重
}
if (maxWeight > 0 && minWeight < maxWeight) { // 权重不一样
AtomicPositiveInteger weightSequence = weightSequences.get(key);
if (weightSequence == null) {
weightSequences.putIfAbsent(key, new AtomicPositiveInteger());
weightSequence = weightSequences.get(key);
}
int currentWeight = weightSequence.getAndIncrement() % maxWeight;
List<Integer> weightInvokers = new ArrayList<Integer>();
for (int i = 0; i < length; i++) { // 筛选权重大于当前权重基数的Invoker
if (getWeight(i) > currentWeight) {
weightInvokers.add(i);
}
}
int weightLength = weightInvokers.size();
if (weightLength == 1) {
return weightInvokers.get(0);
} else if (weightLength > 1) {
invokers = weightInvokers;
length = invokers.size();
}
}
AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
sequences.putIfAbsent(key, new AtomicPositiveInteger());
sequence = sequences.get(key);
}
// 取模轮循
return invokers.get(sequence.getAndIncrement() % length);
}
/**
* 2.5.3版本的random方式
*
* @return
*/
private static int oldRandomSelect() {
// 为了测试方便,key默认写死
String key = SERVICE_KEY;
// invoker默认是10个
int length = INVOKER_WEIGHT_ARRAY.length; // Number of invokers
int totalWeight = 0; // 总权重
boolean sameWeight = true; // 权重是否都一样
for (int i = 0; i < length; i++) {
int weight = getWeight(i);
totalWeight += weight; // 累计总权重
if (sameWeight && i > 0
&& weight != getWeight(i - 1)) {
sameWeight = false; // 计算所有权重是否一样
}
}
if (totalWeight > 0 && !sameWeight) {
// 如果权重不相同且权重大于0则按总权重数随机
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// 并确定随机值落在哪个片断上
for (int i = 0; i < length; i++) {
offset -= getWeight(i);
if (offset < 0) {
return i;
}
}
}
// 如果权重相同或权重为0则均等随机
return ThreadLocalRandom.current().nextInt(length);
}
private static int getWeight(int invokerIndex) {
return INVOKER_WEIGHT_ARRAY[invokerIndex];
}
private static final class IntegerWrapper {
private int value;
public IntegerWrapper(int value) {
this.value = value;
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
public void decrement() {
this.value--;
}
}
private static class WeightedRoundRobin {
private int weight;
private int current;
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
}
public void setCurrent(int current) {
this.current = current;
}
public int increaseWeight() {
current += weight;
return current;
}
public void sel(int total) {
current -= total;
}
}
private final static ConcurrentMap<String, Map<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, Map<String, WeightedRoundRobin>>();
private static Map<String, WeightedRoundRobin> getEntry(String key) {
Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
methodWeightMap.putIfAbsent(key, new HashMap<String, WeightedRoundRobin>());
map = methodWeightMap.get(key);
}
return map;
}
private static int wrrSelect() {
// 为了测试方便,key默认写死
String key = SERVICE_KEY;
Map<String, WeightedRoundRobin> map = getEntry(key);
synchronized (map) {
int totalWeight = 0;
int maxCurrent = Integer.MIN_VALUE;
int selectedInvoker = -1;
WeightedRoundRobin selectedWRR = null;
for (int i = 0; i < INVOKER_WEIGHT_ARRAY.length; i++) {
String addr = "server:" + i;
WeightedRoundRobin weightedRoundRobin = map.get(addr);
int weight = INVOKER_WEIGHT_ARRAY[i];
if (weight < 0) {
weight = 0;
}
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setCurrent(0);
weightedRoundRobin.setWeight(weight);
map.put(addr, weightedRoundRobin);
}
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
weightedRoundRobin.setCurrent(0);
weightedRoundRobin.setWeight(weight);
}
int cur = weightedRoundRobin.increaseWeight();
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = i;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight;
}
if (selectedInvoker >= 0) {
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
}
return -1;
}
}
```
Result:
```
最新dubbo的RoundRobinLoadBalance耗时:8975
最新dubbo的RoundRobinLoadBalance流量分布:[33400,33400,66700,66700,100000,100000,133300,133300,166600,166600]
dubbo-2.5.3的RoundRobinLoadBalance耗时:152
dubbo-2.5.3的RoundRobinLoadBalance流量分布:[20000,20000,45000,45000,78333,78333,128333,128333,228334,228334]
dubbo-2.5.3的RandomLoadBalance耗时:45
dubbo-2.5.3的RandomLoadBalance流量分布:[33279,33289,67213,66143,99651,99970,133518,133463,167300,166174]
smooth的wrr耗时:390
smooth的wrr流量分布:[33333,33333,66667,66667,100000,100000,133333,133333,166667,166667]
```
Is it acceptable?
[ Full content available at: https://github.com/apache/incubator-dubbo/issues/2578 ]
This message was relayed via gitbox.apache.org for notifications@dubbo.apache.org