You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by ch...@apache.org on 2021/10/04 09:58:26 UTC

[incubator-eventmesh] branch develop updated: add weighted random load balance selector (#526)

This is an automated email from the ASF dual-hosted git repository.

chenguangsheng pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8b6c827  add weighted random load balance selector (#526)
8b6c827 is described below

commit 8b6c82722afadc577f837adc9ed74a43e818c843
Author: horoc <63...@users.noreply.github.com>
AuthorDate: Mon Oct 4 17:58:00 2021 +0800

    add weighted random load balance selector (#526)
---
 .../common/loadbalance/LoadBalanceType.java        |  3 +-
 .../WeightRandomLoadBalanceSelector.java           | 76 +++++++++++++++++++
 .../WeightRandomLoadBalanceSelectorTest.java       | 86 ++++++++++++++++++++++
 .../client/http/conf/LiteClientConfig.java         |  2 +-
 .../client/http/util/HttpLoadBalanceUtils.java     | 74 ++++++++++++-------
 .../client/http/util/HttpLoadBalanceUtilsTest.java | 10 +++
 6 files changed, 221 insertions(+), 30 deletions(-)

diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/LoadBalanceType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/LoadBalanceType.java
index 858a69b..6096abd 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/LoadBalanceType.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/LoadBalanceType.java
@@ -21,7 +21,8 @@ package org.apache.eventmesh.common.loadbalance;
 
 public enum LoadBalanceType {
     RANDOM(0, "random load balance strategy"),
-    WEIGHT_ROUND_ROBIN(1, "weight round robin load balance strategy");
+    WEIGHT_ROUND_ROBIN(1, "weight round robin load balance strategy"),
+    WEIGHT_RANDOM(2, "weight random load balance strategy");
 
     private int code;
     private String desc;
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/WeightRandomLoadBalanceSelector.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/WeightRandomLoadBalanceSelector.java
new file mode 100644
index 0000000..1fc239d
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/WeightRandomLoadBalanceSelector.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.loadbalance;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.eventmesh.common.EventMeshException;
+
+import java.util.List;
+
+/**
+ * This selector use the weighted random strategy to select from list.
+ * If all the weights are same, it will randomly select one from list.
+ * If the weights are different, it will select one by using RandomUtils.nextInt(0, w0 + w1 ... + wn)
+ *
+ * @param <T> Target type
+ */
+public class WeightRandomLoadBalanceSelector<T> implements LoadBalanceSelector<T> {
+
+    private final List<Weight<T>> clusterGroup;
+
+    private final int totalWeight;
+
+    private boolean sameWeightGroup = true;
+
+    public WeightRandomLoadBalanceSelector(List<Weight<T>> clusterGroup) throws EventMeshException {
+        if (CollectionUtils.isEmpty(clusterGroup)) {
+            throw new EventMeshException("clusterGroup can not be empty");
+        }
+        int totalWeight = 0;
+        int firstWeight = clusterGroup.get(0).getWeight();
+        for (Weight<T> weight : clusterGroup) {
+            totalWeight += weight.getWeight();
+            if (sameWeightGroup && firstWeight != weight.getWeight()) {
+                sameWeightGroup = false;
+            }
+        }
+        this.clusterGroup = clusterGroup;
+        this.totalWeight = totalWeight;
+    }
+
+    @Override
+    public T select() {
+        if (!sameWeightGroup) {
+            int targetWeight = RandomUtils.nextInt(0, totalWeight);
+            for (Weight<T> weight : clusterGroup) {
+                targetWeight -= weight.getWeight();
+                if (targetWeight < 0) {
+                    return weight.getTarget();
+                }
+            }
+        }
+        int length = clusterGroup.size();
+        return clusterGroup.get(RandomUtils.nextInt(0, length)).getTarget();
+    }
+
+    @Override
+    public LoadBalanceType getType() {
+        return LoadBalanceType.WEIGHT_RANDOM;
+    }
+}
diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/loadbalance/WeightRandomLoadBalanceSelectorTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/loadbalance/WeightRandomLoadBalanceSelectorTest.java
new file mode 100644
index 0000000..5331630
--- /dev/null
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/loadbalance/WeightRandomLoadBalanceSelectorTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.loadbalance;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.summingInt;
+
+public class WeightRandomLoadBalanceSelectorTest {
+
+    private Logger logger = LoggerFactory.getLogger(WeightRandomLoadBalanceSelectorTest.class);
+
+    @Test
+    public void testSelect() throws Exception {
+        List<Weight<String>> weightList = new ArrayList<>();
+        weightList.add(new Weight<>("192.168.0.1", 10));
+        weightList.add(new Weight<>("192.168.0.2", 20));
+        weightList.add(new Weight<>("192.168.0.3", 40));
+        WeightRandomLoadBalanceSelector<String> weightRandomLoadBalanceSelector = new WeightRandomLoadBalanceSelector<>(weightList);
+        Assert.assertEquals(LoadBalanceType.WEIGHT_RANDOM, weightRandomLoadBalanceSelector.getType());
+        int testRange = 100_000;
+        Map<String, Integer> addressToNum = IntStream.range(0, testRange)
+                .mapToObj(i -> weightRandomLoadBalanceSelector.select())
+                .collect(groupingBy(Function.identity(), summingInt(i -> 1)));
+
+        addressToNum.forEach((key, value) -> {
+            logger.info("{}: {}", key, value);
+        });
+        System.out.printf(addressToNum.toString());
+        // the error less than 5%
+        Assert.assertTrue(Math.abs(addressToNum.get("192.168.0.3") - addressToNum.get("192.168.0.2") * 2) < testRange / 20);
+        Assert.assertTrue(Math.abs(addressToNum.get("192.168.0.3") - addressToNum.get("192.168.0.1") * 4) < testRange / 20);
+    }
+
+    @Test
+    public void testSameWeightSelect() throws Exception {
+        List<Weight<String>> weightList = new ArrayList<>();
+        weightList.add(new Weight<>("192.168.0.1", 10));
+        weightList.add(new Weight<>("192.168.0.2", 10));
+        weightList.add(new Weight<>("192.168.0.3", 10));
+        WeightRandomLoadBalanceSelector<String> weightRandomLoadBalanceSelector = new WeightRandomLoadBalanceSelector<>(weightList);
+        Assert.assertEquals(LoadBalanceType.WEIGHT_RANDOM, weightRandomLoadBalanceSelector.getType());
+
+        int testRange = 100_000;
+        Map<String, Integer> addressToNum = IntStream.range(0, testRange)
+                .mapToObj(i -> weightRandomLoadBalanceSelector.select())
+                .collect(groupingBy(Function.identity(), summingInt(i -> 1)));
+
+        Field field = WeightRandomLoadBalanceSelector.class.getDeclaredField("sameWeightGroup");
+        field.setAccessible(true);
+        boolean sameWeightGroup = (boolean) field.get(weightRandomLoadBalanceSelector);
+        Assert.assertTrue(sameWeightGroup);
+
+        addressToNum.forEach((key, value) -> {
+            logger.info("{}: {}", key, value);
+        });
+        // the error less than 5%
+        Assert.assertTrue(Math.abs(addressToNum.get("192.168.0.3") - addressToNum.get("192.168.0.2")) < testRange / 20);
+    }
+}
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/conf/LiteClientConfig.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/conf/LiteClientConfig.java
index fc5368a..2749816 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/conf/LiteClientConfig.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/conf/LiteClientConfig.java
@@ -28,7 +28,7 @@ public class LiteClientConfig {
      * <p>
      * E.g.
      * <p>If you use Random strategy, the format like: 127.0.0.1:10105;127.0.0.2:10105
-     * <p>If you use weighted round robin strategy, the format like: 127.0.0.1:10105:1;127.0.0.2:10105:2
+     * <p>If you use weighted round robin or weighted random strategy, the format like: 127.0.0.1:10105:1;127.0.0.2:10105:2
      */
     private String liteEventMeshAddr = "127.0.0.1:10105";
 
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java
index 4350613..5db7e54 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java
@@ -25,6 +25,7 @@ import org.apache.eventmesh.common.loadbalance.LoadBalanceSelector;
 import org.apache.eventmesh.common.loadbalance.RandomLoadBalanceSelector;
 import org.apache.eventmesh.common.loadbalance.Weight;
 import org.apache.eventmesh.common.loadbalance.WeightRoundRobinLoadBalanceSelector;
+import org.apache.eventmesh.common.loadbalance.WeightRandomLoadBalanceSelector;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -37,39 +38,16 @@ public class HttpLoadBalanceUtils {
 
     public static LoadBalanceSelector<String> createEventMeshServerLoadBalanceSelector(LiteClientConfig liteClientConfig)
             throws EventMeshException {
-        List<String> eventMeshAddrs = Splitter.on(";").splitToList(liteClientConfig.getLiteEventMeshAddr());
-        if (CollectionUtils.isEmpty(eventMeshAddrs)) {
-            throw new EventMeshException("liteEventMeshAddr can not be empty");
-        }
-
         LoadBalanceSelector<String> eventMeshServerSelector = null;
         switch (liteClientConfig.getLoadBalanceType()) {
             case RANDOM:
-                List<String> eventMeshAddrList = new ArrayList<>();
-                for (String eventMeshAddr : eventMeshAddrs) {
-                    if (!IP_PORT_PATTERN.matcher(eventMeshAddr).matches()) {
-                        throw new EventMeshException(
-                                String.format("liteEventMeshAddr:%s is not illegal", liteClientConfig.getLiteEventMeshAddr()));
-                    }
-                    eventMeshAddrList.add(eventMeshAddr);
-                }
-                eventMeshServerSelector = new RandomLoadBalanceSelector<>(eventMeshAddrList);
+                eventMeshServerSelector = new RandomLoadBalanceSelector<>(buildClusterGroupFromConfig(liteClientConfig));
+                break;
+            case WEIGHT_RANDOM:
+                eventMeshServerSelector = new WeightRandomLoadBalanceSelector<>(buildWeightedClusterGroupFromConfig(liteClientConfig));
                 break;
             case WEIGHT_ROUND_ROBIN:
-                List<Weight<String>> eventMeshAddrWeightList = new ArrayList<>();
-                for (String eventMeshAddrWight : eventMeshAddrs) {
-                    if (!IP_PORT_WEIGHT_PATTERN.matcher(eventMeshAddrWight).matches()) {
-                        throw new EventMeshException(
-                                String.format("liteEventMeshAddr:%s is not illegal", liteClientConfig.getLiteEventMeshAddr()));
-                    }
-                    int splitIndex = eventMeshAddrWight.lastIndexOf(":");
-                    Weight<String> weight = new Weight<>(
-                            eventMeshAddrWight.substring(0, splitIndex),
-                            Integer.parseInt(eventMeshAddrWight.substring(splitIndex + 1))
-                    );
-                    eventMeshAddrWeightList.add(weight);
-                }
-                eventMeshServerSelector = new WeightRoundRobinLoadBalanceSelector<>(eventMeshAddrWeightList);
+                eventMeshServerSelector = new WeightRoundRobinLoadBalanceSelector<>(buildWeightedClusterGroupFromConfig(liteClientConfig));
                 break;
             default:
                 // ignore
@@ -80,4 +58,44 @@ public class HttpLoadBalanceUtils {
         return eventMeshServerSelector;
     }
 
+    private static List<Weight<String>> buildWeightedClusterGroupFromConfig(LiteClientConfig liteClientConfig)
+            throws EventMeshException {
+        List<String> eventMeshAddrs = Splitter.on(";").trimResults().splitToList(liteClientConfig.getLiteEventMeshAddr());
+        if (CollectionUtils.isEmpty(eventMeshAddrs)) {
+            throw new EventMeshException("liteEventMeshAddr can not be empty");
+        }
+
+        List<Weight<String>> eventMeshAddrWeightList = new ArrayList<>();
+        for (String eventMeshAddrWight : eventMeshAddrs) {
+            if (!IP_PORT_WEIGHT_PATTERN.matcher(eventMeshAddrWight).matches()) {
+                throw new EventMeshException(
+                        String.format("liteEventMeshAddr:%s is not illegal", liteClientConfig.getLiteEventMeshAddr()));
+            }
+            int splitIndex = eventMeshAddrWight.lastIndexOf(":");
+            Weight<String> weight = new Weight<>(
+                    eventMeshAddrWight.substring(0, splitIndex),
+                    Integer.parseInt(eventMeshAddrWight.substring(splitIndex + 1))
+            );
+            eventMeshAddrWeightList.add(weight);
+        }
+        return eventMeshAddrWeightList;
+    }
+
+    private static List<String> buildClusterGroupFromConfig(LiteClientConfig liteClientConfig)
+            throws EventMeshException {
+        List<String> eventMeshAddrs = Splitter.on(";").trimResults().splitToList(liteClientConfig.getLiteEventMeshAddr());
+        if (CollectionUtils.isEmpty(eventMeshAddrs)) {
+            throw new EventMeshException("liteEventMeshAddr can not be empty");
+        }
+
+        List<String> eventMeshAddrList = new ArrayList<>();
+        for (String eventMeshAddr : eventMeshAddrs) {
+            if (!IP_PORT_PATTERN.matcher(eventMeshAddr).matches()) {
+                throw new EventMeshException(
+                        String.format("liteEventMeshAddr:%s is not illegal", liteClientConfig.getLiteEventMeshAddr()));
+            }
+            eventMeshAddrList.add(eventMeshAddr);
+        }
+        return eventMeshAddrList;
+    }
 }
diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtilsTest.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtilsTest.java
index d393c24..7355c4f 100644
--- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtilsTest.java
+++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtilsTest.java
@@ -44,4 +44,14 @@ public class HttpLoadBalanceUtilsTest {
                 .createEventMeshServerLoadBalanceSelector(liteClientConfig);
         Assert.assertEquals(LoadBalanceType.WEIGHT_ROUND_ROBIN, weightRoundRobinSelector.getType());
     }
+
+    @Test
+    public void testCreateWeightRandomSelector() throws EventMeshException {
+        LiteClientConfig liteClientConfig = new LiteClientConfig()
+                .setLiteEventMeshAddr("127.0.0.1:1001:1;127.0.0.2:1001:2")
+                .setLoadBalanceType(LoadBalanceType.WEIGHT_RANDOM);
+        LoadBalanceSelector<String> weightRoundRobinSelector = HttpLoadBalanceUtils
+                .createEventMeshServerLoadBalanceSelector(liteClientConfig);
+        Assert.assertEquals(LoadBalanceType.WEIGHT_RANDOM, weightRoundRobinSelector.getType());
+    }
 }
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org