You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by ch...@apache.org on 2021/10/04 09:58:26 UTC
[incubator-eventmesh] branch develop updated: add weighted random
load balance selector (#526)
This is an automated email from the ASF dual-hosted git repository.
chenguangsheng pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/develop by this push:
new 8b6c827 add weighted random load balance selector (#526)
8b6c827 is described below
commit 8b6c82722afadc577f837adc9ed74a43e818c843
Author: horoc <63...@users.noreply.github.com>
AuthorDate: Mon Oct 4 17:58:00 2021 +0800
add weighted random load balance selector (#526)
---
.../common/loadbalance/LoadBalanceType.java | 3 +-
.../WeightRandomLoadBalanceSelector.java | 76 +++++++++++++++++++
.../WeightRandomLoadBalanceSelectorTest.java | 86 ++++++++++++++++++++++
.../client/http/conf/LiteClientConfig.java | 2 +-
.../client/http/util/HttpLoadBalanceUtils.java | 74 ++++++++++++-------
.../client/http/util/HttpLoadBalanceUtilsTest.java | 10 +++
6 files changed, 221 insertions(+), 30 deletions(-)
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/LoadBalanceType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/LoadBalanceType.java
index 858a69b..6096abd 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/LoadBalanceType.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/LoadBalanceType.java
@@ -21,7 +21,8 @@ package org.apache.eventmesh.common.loadbalance;
public enum LoadBalanceType {
RANDOM(0, "random load balance strategy"),
- WEIGHT_ROUND_ROBIN(1, "weight round robin load balance strategy");
+ WEIGHT_ROUND_ROBIN(1, "weight round robin load balance strategy"),
+ WEIGHT_RANDOM(2, "weight random load balance strategy");
private int code;
private String desc;
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/WeightRandomLoadBalanceSelector.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/WeightRandomLoadBalanceSelector.java
new file mode 100644
index 0000000..1fc239d
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/WeightRandomLoadBalanceSelector.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.loadbalance;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.eventmesh.common.EventMeshException;
+
+import java.util.List;
+
+/**
+ * This selector use the weighted random strategy to select from list.
+ * If all the weights are same, it will randomly select one from list.
+ * If the weights are different, it will select one by using RandomUtils.nextInt(0, w0 + w1 ... + wn)
+ *
+ * @param <T> Target type
+ */
+public class WeightRandomLoadBalanceSelector<T> implements LoadBalanceSelector<T> {
+
+ private final List<Weight<T>> clusterGroup;
+
+ private final int totalWeight;
+
+ private boolean sameWeightGroup = true;
+
+ public WeightRandomLoadBalanceSelector(List<Weight<T>> clusterGroup) throws EventMeshException {
+ if (CollectionUtils.isEmpty(clusterGroup)) {
+ throw new EventMeshException("clusterGroup can not be empty");
+ }
+ int totalWeight = 0;
+ int firstWeight = clusterGroup.get(0).getWeight();
+ for (Weight<T> weight : clusterGroup) {
+ totalWeight += weight.getWeight();
+ if (sameWeightGroup && firstWeight != weight.getWeight()) {
+ sameWeightGroup = false;
+ }
+ }
+ this.clusterGroup = clusterGroup;
+ this.totalWeight = totalWeight;
+ }
+
+ @Override
+ public T select() {
+ if (!sameWeightGroup) {
+ int targetWeight = RandomUtils.nextInt(0, totalWeight);
+ for (Weight<T> weight : clusterGroup) {
+ targetWeight -= weight.getWeight();
+ if (targetWeight < 0) {
+ return weight.getTarget();
+ }
+ }
+ }
+ int length = clusterGroup.size();
+ return clusterGroup.get(RandomUtils.nextInt(0, length)).getTarget();
+ }
+
+ @Override
+ public LoadBalanceType getType() {
+ return LoadBalanceType.WEIGHT_RANDOM;
+ }
+}
diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/loadbalance/WeightRandomLoadBalanceSelectorTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/loadbalance/WeightRandomLoadBalanceSelectorTest.java
new file mode 100644
index 0000000..5331630
--- /dev/null
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/loadbalance/WeightRandomLoadBalanceSelectorTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.loadbalance;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.summingInt;
+
+public class WeightRandomLoadBalanceSelectorTest {
+
+ private Logger logger = LoggerFactory.getLogger(WeightRandomLoadBalanceSelectorTest.class);
+
+ @Test
+ public void testSelect() throws Exception {
+ List<Weight<String>> weightList = new ArrayList<>();
+ weightList.add(new Weight<>("192.168.0.1", 10));
+ weightList.add(new Weight<>("192.168.0.2", 20));
+ weightList.add(new Weight<>("192.168.0.3", 40));
+ WeightRandomLoadBalanceSelector<String> weightRandomLoadBalanceSelector = new WeightRandomLoadBalanceSelector<>(weightList);
+ Assert.assertEquals(LoadBalanceType.WEIGHT_RANDOM, weightRandomLoadBalanceSelector.getType());
+ int testRange = 100_000;
+ Map<String, Integer> addressToNum = IntStream.range(0, testRange)
+ .mapToObj(i -> weightRandomLoadBalanceSelector.select())
+ .collect(groupingBy(Function.identity(), summingInt(i -> 1)));
+
+ addressToNum.forEach((key, value) -> {
+ logger.info("{}: {}", key, value);
+ });
+ System.out.printf(addressToNum.toString());
+ // the error less than 5%
+ Assert.assertTrue(Math.abs(addressToNum.get("192.168.0.3") - addressToNum.get("192.168.0.2") * 2) < testRange / 20);
+ Assert.assertTrue(Math.abs(addressToNum.get("192.168.0.3") - addressToNum.get("192.168.0.1") * 4) < testRange / 20);
+ }
+
+ @Test
+ public void testSameWeightSelect() throws Exception {
+ List<Weight<String>> weightList = new ArrayList<>();
+ weightList.add(new Weight<>("192.168.0.1", 10));
+ weightList.add(new Weight<>("192.168.0.2", 10));
+ weightList.add(new Weight<>("192.168.0.3", 10));
+ WeightRandomLoadBalanceSelector<String> weightRandomLoadBalanceSelector = new WeightRandomLoadBalanceSelector<>(weightList);
+ Assert.assertEquals(LoadBalanceType.WEIGHT_RANDOM, weightRandomLoadBalanceSelector.getType());
+
+ int testRange = 100_000;
+ Map<String, Integer> addressToNum = IntStream.range(0, testRange)
+ .mapToObj(i -> weightRandomLoadBalanceSelector.select())
+ .collect(groupingBy(Function.identity(), summingInt(i -> 1)));
+
+ Field field = WeightRandomLoadBalanceSelector.class.getDeclaredField("sameWeightGroup");
+ field.setAccessible(true);
+ boolean sameWeightGroup = (boolean) field.get(weightRandomLoadBalanceSelector);
+ Assert.assertTrue(sameWeightGroup);
+
+ addressToNum.forEach((key, value) -> {
+ logger.info("{}: {}", key, value);
+ });
+ // the error less than 5%
+ Assert.assertTrue(Math.abs(addressToNum.get("192.168.0.3") - addressToNum.get("192.168.0.2")) < testRange / 20);
+ }
+}
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/conf/LiteClientConfig.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/conf/LiteClientConfig.java
index fc5368a..2749816 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/conf/LiteClientConfig.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/conf/LiteClientConfig.java
@@ -28,7 +28,7 @@ public class LiteClientConfig {
* <p>
* E.g.
* <p>If you use Random strategy, the format like: 127.0.0.1:10105;127.0.0.2:10105
- * <p>If you use weighted round robin strategy, the format like: 127.0.0.1:10105:1;127.0.0.2:10105:2
+ * <p>If you use weighted round robin or weighted random strategy, the format like: 127.0.0.1:10105:1;127.0.0.2:10105:2
*/
private String liteEventMeshAddr = "127.0.0.1:10105";
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java
index 4350613..5db7e54 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java
@@ -25,6 +25,7 @@ import org.apache.eventmesh.common.loadbalance.LoadBalanceSelector;
import org.apache.eventmesh.common.loadbalance.RandomLoadBalanceSelector;
import org.apache.eventmesh.common.loadbalance.Weight;
import org.apache.eventmesh.common.loadbalance.WeightRoundRobinLoadBalanceSelector;
+import org.apache.eventmesh.common.loadbalance.WeightRandomLoadBalanceSelector;
import java.util.ArrayList;
import java.util.List;
@@ -37,39 +38,16 @@ public class HttpLoadBalanceUtils {
public static LoadBalanceSelector<String> createEventMeshServerLoadBalanceSelector(LiteClientConfig liteClientConfig)
throws EventMeshException {
- List<String> eventMeshAddrs = Splitter.on(";").splitToList(liteClientConfig.getLiteEventMeshAddr());
- if (CollectionUtils.isEmpty(eventMeshAddrs)) {
- throw new EventMeshException("liteEventMeshAddr can not be empty");
- }
-
LoadBalanceSelector<String> eventMeshServerSelector = null;
switch (liteClientConfig.getLoadBalanceType()) {
case RANDOM:
- List<String> eventMeshAddrList = new ArrayList<>();
- for (String eventMeshAddr : eventMeshAddrs) {
- if (!IP_PORT_PATTERN.matcher(eventMeshAddr).matches()) {
- throw new EventMeshException(
- String.format("liteEventMeshAddr:%s is not illegal", liteClientConfig.getLiteEventMeshAddr()));
- }
- eventMeshAddrList.add(eventMeshAddr);
- }
- eventMeshServerSelector = new RandomLoadBalanceSelector<>(eventMeshAddrList);
+ eventMeshServerSelector = new RandomLoadBalanceSelector<>(buildClusterGroupFromConfig(liteClientConfig));
+ break;
+ case WEIGHT_RANDOM:
+ eventMeshServerSelector = new WeightRandomLoadBalanceSelector<>(buildWeightedClusterGroupFromConfig(liteClientConfig));
break;
case WEIGHT_ROUND_ROBIN:
- List<Weight<String>> eventMeshAddrWeightList = new ArrayList<>();
- for (String eventMeshAddrWight : eventMeshAddrs) {
- if (!IP_PORT_WEIGHT_PATTERN.matcher(eventMeshAddrWight).matches()) {
- throw new EventMeshException(
- String.format("liteEventMeshAddr:%s is not illegal", liteClientConfig.getLiteEventMeshAddr()));
- }
- int splitIndex = eventMeshAddrWight.lastIndexOf(":");
- Weight<String> weight = new Weight<>(
- eventMeshAddrWight.substring(0, splitIndex),
- Integer.parseInt(eventMeshAddrWight.substring(splitIndex + 1))
- );
- eventMeshAddrWeightList.add(weight);
- }
- eventMeshServerSelector = new WeightRoundRobinLoadBalanceSelector<>(eventMeshAddrWeightList);
+ eventMeshServerSelector = new WeightRoundRobinLoadBalanceSelector<>(buildWeightedClusterGroupFromConfig(liteClientConfig));
break;
default:
// ignore
@@ -80,4 +58,44 @@ public class HttpLoadBalanceUtils {
return eventMeshServerSelector;
}
+ private static List<Weight<String>> buildWeightedClusterGroupFromConfig(LiteClientConfig liteClientConfig)
+ throws EventMeshException {
+ List<String> eventMeshAddrs = Splitter.on(";").trimResults().splitToList(liteClientConfig.getLiteEventMeshAddr());
+ if (CollectionUtils.isEmpty(eventMeshAddrs)) {
+ throw new EventMeshException("liteEventMeshAddr can not be empty");
+ }
+
+ List<Weight<String>> eventMeshAddrWeightList = new ArrayList<>();
+ for (String eventMeshAddrWight : eventMeshAddrs) {
+ if (!IP_PORT_WEIGHT_PATTERN.matcher(eventMeshAddrWight).matches()) {
+ throw new EventMeshException(
+ String.format("liteEventMeshAddr:%s is not illegal", liteClientConfig.getLiteEventMeshAddr()));
+ }
+ int splitIndex = eventMeshAddrWight.lastIndexOf(":");
+ Weight<String> weight = new Weight<>(
+ eventMeshAddrWight.substring(0, splitIndex),
+ Integer.parseInt(eventMeshAddrWight.substring(splitIndex + 1))
+ );
+ eventMeshAddrWeightList.add(weight);
+ }
+ return eventMeshAddrWeightList;
+ }
+
+ private static List<String> buildClusterGroupFromConfig(LiteClientConfig liteClientConfig)
+ throws EventMeshException {
+ List<String> eventMeshAddrs = Splitter.on(";").trimResults().splitToList(liteClientConfig.getLiteEventMeshAddr());
+ if (CollectionUtils.isEmpty(eventMeshAddrs)) {
+ throw new EventMeshException("liteEventMeshAddr can not be empty");
+ }
+
+ List<String> eventMeshAddrList = new ArrayList<>();
+ for (String eventMeshAddr : eventMeshAddrs) {
+ if (!IP_PORT_PATTERN.matcher(eventMeshAddr).matches()) {
+ throw new EventMeshException(
+ String.format("liteEventMeshAddr:%s is not illegal", liteClientConfig.getLiteEventMeshAddr()));
+ }
+ eventMeshAddrList.add(eventMeshAddr);
+ }
+ return eventMeshAddrList;
+ }
}
diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtilsTest.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtilsTest.java
index d393c24..7355c4f 100644
--- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtilsTest.java
+++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtilsTest.java
@@ -44,4 +44,14 @@ public class HttpLoadBalanceUtilsTest {
.createEventMeshServerLoadBalanceSelector(liteClientConfig);
Assert.assertEquals(LoadBalanceType.WEIGHT_ROUND_ROBIN, weightRoundRobinSelector.getType());
}
+
+ @Test
+ public void testCreateWeightRandomSelector() throws EventMeshException {
+ LiteClientConfig liteClientConfig = new LiteClientConfig()
+ .setLiteEventMeshAddr("127.0.0.1:1001:1;127.0.0.2:1001:2")
+ .setLoadBalanceType(LoadBalanceType.WEIGHT_RANDOM);
+ LoadBalanceSelector<String> weightRoundRobinSelector = HttpLoadBalanceUtils
+ .createEventMeshServerLoadBalanceSelector(liteClientConfig);
+ Assert.assertEquals(LoadBalanceType.WEIGHT_RANDOM, weightRoundRobinSelector.getType());
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org