You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/09/22 08:11:38 UTC

[dubbo-spi-extensions] branch master updated: add PeakEwmaLoadBalance (#68)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-spi-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new 48abd0b  add PeakEwmaLoadBalance (#68)
48abd0b is described below

commit 48abd0be229f02b25ae3d17c0b66223dacfc1133
Author: lmj <10...@qq.com>
AuthorDate: Wed Sep 22 16:11:34 2021 +0800

    add PeakEwmaLoadBalance (#68)
    
    * add PeakEwmaLoadBalance
    
    * rename moudle
    
    * move ASF license to first line
---
 .../pom.xml                                        |  19 +-
 .../cluster/loadbalance/PeakEwmaLoadBalance.java   | 137 ++++++++++
 .../loadbalance/AbstractLoadBalanceTest.java       |  82 ++++++
 .../cluster/loadbalance/LoadBalanceBaseTest.java   | 297 +++++++++++++++++++++
 .../loadbalance/PeakEwmaLoadBalanceTest.java       | 113 ++++++++
 dubbo-cluster-extensions/pom.xml                   |   1 +
 6 files changed, 643 insertions(+), 6 deletions(-)

diff --git a/dubbo-cluster-extensions/pom.xml b/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/pom.xml
similarity index 72%
copy from dubbo-cluster-extensions/pom.xml
copy to dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/pom.xml
index f3a3b20..471b76d 100644
--- a/dubbo-cluster-extensions/pom.xml
+++ b/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/pom.xml
@@ -19,18 +19,25 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
+        <artifactId>dubbo-cluster-extensions</artifactId>
         <groupId>org.apache.dubbo.extensions</groupId>
-        <artifactId>extensions-parent</artifactId>
         <version>${revision}</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>dubbo-cluster-extensions</artifactId>
-    <packaging>pom</packaging>
-    <modules>
-        <module>dubbo-cluster-broadcast-1</module>
-    </modules>
+    <artifactId>dubbo-cluster-loadbalance-peakewma</artifactId>
+    <packaging>jar</packaging>
 
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-cluster</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-rpc-api</artifactId>
+        </dependency>
+    </dependencies>
 
 </project>
diff --git a/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/PeakEwmaLoadBalance.java b/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/PeakEwmaLoadBalance.java
new file mode 100644
index 0000000..b87ae84
--- /dev/null
+++ b/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/PeakEwmaLoadBalance.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.loadbalance;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcStatus;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * <p>
+ * PeakEwmaLoadBalance is designed to converge quickly when encountering slow endpoints.
+ * It is quick to react to latency spikes recovering only cautiously.Peak EWMA takes
+ * history into account,so that slow behavior is penalized relative to the
+ * supplied `decayTime`.
+ * if there are multiple invokers and the same cost,then randomly called,which doesn't care
+ * about weight.
+ * <p>
+ * Inspiration drawn from:
+ * https://github.com/twitter/finagle/blob/1bc837c4feafc0096e43c0e98516a8e1c50c4421
+ * /finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
+ */
+public class PeakEwmaLoadBalance extends AbstractLoadBalance {
+
+    public static final String NAME = "peakewma";
+
+    private static final String PEAK_EWMA_DECAY_TIME = "peakEwmaDecayTime";
+
+    private static final double PENALTY = Long.MAX_VALUE >> 16;
+
+    // The mean lifetime of `cost`, it reaches its half-life after decayTime*ln(2).
+    private static double decayTime = ApplicationModel.getEnvironment().getConfiguration().getInt(PEAK_EWMA_DECAY_TIME, 10_000);
+
+    private ConcurrentMap<RpcStatus, Metric> methodMap = new ConcurrentHashMap<>();
+
+    protected static class Metric {
+        // last timestamp in Millis we observed an runningTime
+        private volatile long lastUpdateTime;
+
+        // ewma of rtt, sensitive to peaks.
+        private volatile double cost;
+
+        // calculate running time And active num
+        private RpcStatus rpcStatus;
+        private long succeededOffset;
+        private long succeededElapsedOffset;
+
+        //lock for get and set cost
+        ReentrantLock ewmaLock = new ReentrantLock();
+
+        public Metric(RpcStatus rpcStatus) {
+            this.rpcStatus = rpcStatus;
+            this.lastUpdateTime = System.currentTimeMillis();
+            this.cost = 0.0;
+            this.succeededOffset = 0;
+            this.succeededElapsedOffset = 0;
+        }
+
+        private void observe() {
+            double rtt = 0;
+            long succeed = this.rpcStatus.getSucceeded() - this.succeededOffset;
+            if (succeed != 0) {
+                rtt = (this.rpcStatus.getSucceededElapsed() * 1.0 - this.succeededElapsedOffset) / succeed;
+            }
+
+            final long currentTime = System.currentTimeMillis();
+            long td = Math.max(currentTime - lastUpdateTime, 0);
+            double w = Math.exp(-td / decayTime);
+            if (rtt > cost) {
+                cost = rtt;
+            } else {
+                cost = cost * w + rtt * (1.0 - w);
+            }
+
+            lastUpdateTime = currentTime;
+            succeededOffset = rpcStatus.getSucceeded();
+            succeededElapsedOffset = rpcStatus.getSucceededElapsed();
+        }
+
+        private double getCost() {
+            ewmaLock.lock();
+            observe();
+            int active = rpcStatus.getActive();
+            ewmaLock.unlock();
+
+            //If we don't have any latency history, we penalize the host on the first probe.
+            return (cost == 0.0 && active != 0) ? PENALTY + active : cost * (active + 1);
+        }
+    }
+
+    @Override
+    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
+        int length = invokers.size();
+        double minResponse = Double.MAX_VALUE;
+
+        List<Integer> selectInvokerIndexList = new ArrayList<>(invokers.size());
+        for (int i = 0; i < length; i++) {
+            Invoker<T> invoker = invokers.get(i);
+            RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
+            Metric metric = methodMap.computeIfAbsent(rpcStatus, Metric::new);
+
+            // calculate the estimated response time from the product of active connections and succeeded average elapsed time.
+            double estimateResponse = metric.getCost();
+            if (estimateResponse < minResponse) {
+                selectInvokerIndexList.clear();
+                selectInvokerIndexList.add(i);
+                minResponse = estimateResponse;
+            } else if (estimateResponse == minResponse) {
+                selectInvokerIndexList.add(i);
+            }
+        }
+
+        return invokers.get(selectInvokerIndexList.get(ThreadLocalRandom.current().nextInt(selectInvokerIndexList.size())));
+    }
+}
diff --git a/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalanceTest.java b/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalanceTest.java
new file mode 100644
index 0000000..f880ef3
--- /dev/null
+++ b/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalanceTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.loadbalance;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.url.component.ServiceConfigURL;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.HashMap;
+import java.util.List;
+
+import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
+import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
+
+public class AbstractLoadBalanceTest {
+
+    private AbstractLoadBalance balance = new AbstractLoadBalance() {
+        @Override
+        protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
+            return null;
+        }
+    };
+
+    @Test
+    public void testGetWeight() {
+        RpcInvocation invocation = new RpcInvocation();
+        invocation.setMethodName("say");
+
+        Invoker invoker1 = mock(Invoker.class, Mockito.withSettings().stubOnly());
+        URL url1 = new ServiceConfigURL("", "", 0, "DemoService", new HashMap<>());
+        url1 = url1.addParameter(TIMESTAMP_KEY, System.currentTimeMillis() - Integer.MAX_VALUE - 1);
+        given(invoker1.getUrl()).willReturn(url1);
+
+        Invoker invoker2 = mock(Invoker.class, Mockito.withSettings().stubOnly());
+        URL url2 = new ServiceConfigURL("", "", 0, "DemoService", new HashMap<>());
+        url2 = url2.addParameter(TIMESTAMP_KEY, System.currentTimeMillis() - 10 * 60 * 1000L - 1);
+        given(invoker2.getUrl()).willReturn(url2);
+
+        Assertions.assertEquals(balance.getWeight(invoker1, invocation), balance.getWeight(invoker2, invocation));
+    }
+
+    @Test
+    public void testGetRegistryWeight() {
+        RpcInvocation invocation = new RpcInvocation();
+        invocation.setMethodName("say");
+
+        Invoker invoker1 = mock(Invoker.class, Mockito.withSettings().stubOnly());
+        URL url1 = new ServiceConfigURL("", "", 0, "DemoService", new HashMap<>());
+        url1 = url1.addParameter(REGISTRY_KEY + "." + WEIGHT_KEY, 10);
+        given(invoker1.getUrl()).willReturn(url1);
+
+        Invoker invoker2 = mock(Invoker.class, Mockito.withSettings().stubOnly());
+        URL url2 = new ServiceConfigURL("", "", 0, "org.apache.dubbo.registry.RegistryService", new HashMap<>());
+        url2 = url2.addParameter(REGISTRY_KEY + "." + WEIGHT_KEY, 20);
+        given(invoker2.getUrl()).willReturn(url2);
+
+        Assertions.assertEquals(100, balance.getWeight(invoker1, invocation));
+        Assertions.assertEquals(20, balance.getWeight(invoker2, invocation));
+    }
+}
diff --git a/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java b/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java
new file mode 100644
index 0000000..46cce2e
--- /dev/null
+++ b/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/LoadBalanceBaseTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License")); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.loadbalance;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.RpcStatus;
+import org.apache.dubbo.rpc.cluster.LoadBalance;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WARMUP;
+import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WEIGHT;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
+
+/**
+ * baseTest
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class LoadBalanceBaseTest {
+    Invocation invocation;
+    List<Invoker<LoadBalanceBaseTest>> invokers = new ArrayList<Invoker<LoadBalanceBaseTest>>();
+    Invoker<LoadBalanceBaseTest> invoker1;
+    Invoker<LoadBalanceBaseTest> invoker2;
+    Invoker<LoadBalanceBaseTest> invoker3;
+    Invoker<LoadBalanceBaseTest> invoker4;
+    Invoker<LoadBalanceBaseTest> invoker5;
+
+    RpcStatus weightTestRpcStatus1;
+    RpcStatus weightTestRpcStatus2;
+    RpcStatus weightTestRpcStatus3;
+    RpcStatus weightTestRpcStatus5;
+
+    RpcInvocation weightTestInvocation;
+
+    /**
+     * @throws Exception
+     */
+    @BeforeAll
+    public static void setUpBeforeClass() throws Exception {
+    }
+
+    /**
+     * @throws Exception
+     */
+    @BeforeEach
+    public void setUp() throws Exception {
+
+        invocation = mock(Invocation.class);
+        given(invocation.getMethodName()).willReturn("method1");
+        given(invocation.getArguments()).willReturn(new Object[]{"arg1", "arg2", "arg3"});
+
+        invoker1 = mock(Invoker.class);
+        invoker2 = mock(Invoker.class);
+        invoker3 = mock(Invoker.class);
+        invoker4 = mock(Invoker.class);
+        invoker5 = mock(Invoker.class);
+
+        URL url1 = URL.valueOf("test://127.0.0.1:1/DemoService");
+        URL url2 = URL.valueOf("test://127.0.0.1:2/DemoService");
+        URL url3 = URL.valueOf("test://127.0.0.1:3/DemoService");
+        URL url4 = URL.valueOf("test://127.0.0.1:4/DemoService");
+        URL url5 = URL.valueOf("test://127.0.0.1:5/DemoService");
+
+        given(invoker1.isAvailable()).willReturn(true);
+        given(invoker1.getInterface()).willReturn(LoadBalanceBaseTest.class);
+        given(invoker1.getUrl()).willReturn(url1);
+
+        given(invoker2.isAvailable()).willReturn(true);
+        given(invoker2.getInterface()).willReturn(LoadBalanceBaseTest.class);
+        given(invoker2.getUrl()).willReturn(url2);
+
+        given(invoker3.isAvailable()).willReturn(true);
+        given(invoker3.getInterface()).willReturn(LoadBalanceBaseTest.class);
+        given(invoker3.getUrl()).willReturn(url3);
+
+        given(invoker4.isAvailable()).willReturn(true);
+        given(invoker4.getInterface()).willReturn(LoadBalanceBaseTest.class);
+        given(invoker4.getUrl()).willReturn(url4);
+
+        given(invoker5.isAvailable()).willReturn(true);
+        given(invoker5.getInterface()).willReturn(LoadBalanceBaseTest.class);
+        given(invoker5.getUrl()).willReturn(url5);
+
+        invokers.add(invoker1);
+        invokers.add(invoker2);
+        invokers.add(invoker3);
+        invokers.add(invoker4);
+        invokers.add(invoker5);
+    }
+
+    public Map<Invoker, AtomicLong> getInvokeCounter(int runs, String loadbalanceName) {
+        Map<Invoker, AtomicLong> counter = new ConcurrentHashMap<Invoker, AtomicLong>();
+        LoadBalance lb = getLoadBalance(loadbalanceName);
+        for (Invoker invoker : invokers) {
+            counter.put(invoker, new AtomicLong(0));
+        }
+        URL url = invokers.get(0).getUrl();
+        for (int i = 0; i < runs; i++) {
+            Invoker sinvoker = lb.select(invokers, url, invocation);
+            counter.get(sinvoker).incrementAndGet();
+        }
+        return counter;
+    }
+
+    protected AbstractLoadBalance getLoadBalance(String loadbalanceName) {
+        return (AbstractLoadBalance) ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(loadbalanceName);
+    }
+
+    @Test
+    public void testLoadBalanceWarmup() {
+        Assertions.assertEquals(1, calculateDefaultWarmupWeight(0));
+        Assertions.assertEquals(1, calculateDefaultWarmupWeight(13));
+        Assertions.assertEquals(1, calculateDefaultWarmupWeight(6 * 1000));
+        Assertions.assertEquals(2, calculateDefaultWarmupWeight(12 * 1000));
+        Assertions.assertEquals(10, calculateDefaultWarmupWeight(60 * 1000));
+        Assertions.assertEquals(50, calculateDefaultWarmupWeight(5 * 60 * 1000));
+        Assertions.assertEquals(50, calculateDefaultWarmupWeight(5 * 60 * 1000 + 23));
+        Assertions.assertEquals(50, calculateDefaultWarmupWeight(5 * 60 * 1000 + 5999));
+        Assertions.assertEquals(51, calculateDefaultWarmupWeight(5 * 60 * 1000 + 6000));
+        Assertions.assertEquals(90, calculateDefaultWarmupWeight(9 * 60 * 1000));
+        Assertions.assertEquals(98, calculateDefaultWarmupWeight(10 * 60 * 1000 - 12 * 1000));
+        Assertions.assertEquals(99, calculateDefaultWarmupWeight(10 * 60 * 1000 - 6 * 1000));
+        Assertions.assertEquals(100, calculateDefaultWarmupWeight(10 * 60 * 1000));
+        Assertions.assertEquals(100, calculateDefaultWarmupWeight(20 * 60 * 1000));
+    }
+
+    /**
+     * handle default data
+     *
+     * @return
+     */
+    private static int calculateDefaultWarmupWeight(int uptime) {
+        return AbstractLoadBalance.calculateWarmupWeight(uptime, DEFAULT_WARMUP, DEFAULT_WEIGHT);
+    }
+
+    /*------------------------------------test invokers for weight---------------------------------------*/
+
+    protected static class InvokeResult {
+        private AtomicLong count = new AtomicLong();
+        private int weight = 0;
+        private int totalWeight = 0;
+
+        public InvokeResult(int weight) {
+            this.weight = weight;
+        }
+
+        public AtomicLong getCount() {
+            return count;
+        }
+
+        public int getWeight() {
+            return weight;
+        }
+
+        public int getTotalWeight() {
+            return totalWeight;
+        }
+
+        public void setTotalWeight(int totalWeight) {
+            this.totalWeight = totalWeight;
+        }
+
+        public int getExpected(int runCount) {
+            return getWeight() * runCount / getTotalWeight();
+        }
+
+        public float getDeltaPercentage(int runCount) {
+            int expected = getExpected(runCount);
+            return Math.abs((expected - getCount().get()) * 100.0f / expected);
+        }
+
+        @Override
+        public String toString() {
+            return JSON.toJSONString(this);
+        }
+    }
+
+    protected List<Invoker<LoadBalanceBaseTest>> weightInvokers = new ArrayList<Invoker<LoadBalanceBaseTest>>();
+    protected List<Invoker<LoadBalanceBaseTest>> weightInvokersSR = new ArrayList<Invoker<LoadBalanceBaseTest>>();
+
+    protected Invoker<LoadBalanceBaseTest> weightInvoker1;
+    protected Invoker<LoadBalanceBaseTest> weightInvoker2;
+    protected Invoker<LoadBalanceBaseTest> weightInvoker3;
+    protected Invoker<LoadBalanceBaseTest> weightInvokerTmp;
+    protected Invoker<LoadBalanceBaseTest> weightInvoker5;
+
+    @BeforeEach
+    public void before() throws Exception {
+        weightInvoker1 = mock(Invoker.class, Mockito.withSettings().stubOnly());
+        weightInvoker2 = mock(Invoker.class, Mockito.withSettings().stubOnly());
+        weightInvoker3 = mock(Invoker.class, Mockito.withSettings().stubOnly());
+        weightInvokerTmp = mock(Invoker.class, Mockito.withSettings().stubOnly());
+        weightInvoker5 = mock(Invoker.class, Mockito.withSettings().stubOnly());
+
+        weightTestInvocation = new RpcInvocation();
+        weightTestInvocation.setMethodName("test");
+
+        URL url1 = URL.valueOf("test1://127.0.0.1:11/DemoService?weight=1&active=0");
+        URL url2 = URL.valueOf("test2://127.0.0.1:12/DemoService?weight=9&active=0");
+        URL url3 = URL.valueOf("test3://127.0.0.1:13/DemoService?weight=6&active=1");
+        URL urlTmp = URL.valueOf("test4://127.0.0.1:9999/DemoService?weight=11&active=0");
+        URL url5 = URL.valueOf("test5://127.0.0.1:15/DemoService?weight=15&active=0");
+
+        given(weightInvoker1.isAvailable()).willReturn(true);
+        given(weightInvoker1.getInterface()).willReturn(LoadBalanceBaseTest.class);
+        given(weightInvoker1.getUrl()).willReturn(url1);
+
+        given(weightInvoker2.isAvailable()).willReturn(true);
+        given(weightInvoker2.getInterface()).willReturn(LoadBalanceBaseTest.class);
+        given(weightInvoker2.getUrl()).willReturn(url2);
+
+        given(weightInvoker3.isAvailable()).willReturn(true);
+        given(weightInvoker3.getInterface()).willReturn(LoadBalanceBaseTest.class);
+        given(weightInvoker3.getUrl()).willReturn(url3);
+
+        given(weightInvokerTmp.isAvailable()).willReturn(true);
+        given(weightInvokerTmp.getInterface()).willReturn(LoadBalanceBaseTest.class);
+        given(weightInvokerTmp.getUrl()).willReturn(urlTmp);
+
+        given(weightInvoker5.isAvailable()).willReturn(true);
+        given(weightInvoker5.getInterface()).willReturn(LoadBalanceBaseTest.class);
+        given(weightInvoker5.getUrl()).willReturn(url5);
+
+        weightInvokers.add(weightInvoker1);
+        weightInvokers.add(weightInvoker2);
+        weightInvokers.add(weightInvoker3);
+
+        weightInvokersSR.add(weightInvoker1);
+        weightInvokersSR.add(weightInvoker2);
+        weightInvokersSR.add(weightInvoker5);
+
+        weightTestRpcStatus1 = RpcStatus.getStatus(weightInvoker1.getUrl(), weightTestInvocation.getMethodName());
+        weightTestRpcStatus2 = RpcStatus.getStatus(weightInvoker2.getUrl(), weightTestInvocation.getMethodName());
+        weightTestRpcStatus3 = RpcStatus.getStatus(weightInvoker3.getUrl(), weightTestInvocation.getMethodName());
+        weightTestRpcStatus5 = RpcStatus.getStatus(weightInvoker5.getUrl(), weightTestInvocation.getMethodName());
+
+
+        // weightTestRpcStatus3 active is 1
+        RpcStatus.beginCount(weightInvoker3.getUrl(), weightTestInvocation.getMethodName());
+
+        // weightTestRpcStatus5 shortest response time of success calls is bigger than 0
+        // weightTestRpcStatus5 active is 1
+        RpcStatus.beginCount(weightInvoker5.getUrl(), weightTestInvocation.getMethodName());
+        RpcStatus.endCount(weightInvoker5.getUrl(), weightTestInvocation.getMethodName(), 5000L, true);
+        RpcStatus.beginCount(weightInvoker5.getUrl(), weightTestInvocation.getMethodName());
+    }
+
+    protected Map<Invoker, InvokeResult> getWeightedInvokeResult(int runs, String loadbalanceName) {
+        Map<Invoker, InvokeResult> counter = new ConcurrentHashMap<Invoker, InvokeResult>();
+        AbstractLoadBalance lb = getLoadBalance(loadbalanceName);
+        int totalWeight = 0;
+        for (int i = 0; i < weightInvokers.size(); i++) {
+            InvokeResult invokeResult = new InvokeResult(lb.getWeight(weightInvokers.get(i), weightTestInvocation));
+            counter.put(weightInvokers.get(i), invokeResult);
+            totalWeight += invokeResult.getWeight();
+        }
+        for (InvokeResult invokeResult : counter.values()) {
+            invokeResult.setTotalWeight(totalWeight);
+        }
+        URL url = weightInvokers.get(0).getUrl();
+        for (int i = 0; i < runs; i++) {
+            Invoker sinvoker = lb.select(weightInvokers, url, weightTestInvocation);
+            counter.get(sinvoker).getCount().incrementAndGet();
+        }
+        return counter;
+    }
+
+}
diff --git a/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/PeakEwmaLoadBalanceTest.java b/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/PeakEwmaLoadBalanceTest.java
new file mode 100644
index 0000000..13957db
--- /dev/null
+++ b/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/PeakEwmaLoadBalanceTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License")); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.loadbalance;
+
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcStatus;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+/**
+ * PeakEwmaLoadBalanceTest
+ */
+public class PeakEwmaLoadBalanceTest extends LoadBalanceBaseTest {
+
+    private static final int THREAD_NUM = 4;
+
+    private static final int INVOKE_NUM = 2_000;
+
+    private static final long SHAKE_TIME = 50;
+
+    private AtomicInteger sumInvoker1 = new AtomicInteger(0);
+    private AtomicInteger sumInvoker2 = new AtomicInteger(0);
+    private AtomicInteger sumInvoker5 = new AtomicInteger(0);
+
+    @Test
+    public void testWithoutShake() throws InterruptedException {
+        //active -> 0
+        RpcStatus.endCount(weightInvoker5.getUrl(), weightTestInvocation.getMethodName(), 0L, true);
+
+        List<Callable<Boolean>> tasks = new ArrayList<>();
+        IntStream.range(0, THREAD_NUM).forEach(e -> tasks.add(getTask(false)));
+
+        ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUM);
+        executorService.invokeAll(tasks);
+
+        Assertions.assertTrue(Math.abs(sumInvoker2.get() - sumInvoker1.get()) <= INVOKE_NUM);
+    }
+
+    @Test
+    public void testWithShake() throws InterruptedException {
+        //active -> 0
+        RpcStatus.endCount(weightInvoker5.getUrl(), weightTestInvocation.getMethodName(), 0L, true);
+
+        List<Callable<Boolean>> tasks = new ArrayList<>();
+        IntStream.range(0, THREAD_NUM).forEach(e -> tasks.add(getTask(true)));
+
+        ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUM);
+        executorService.invokeAll(tasks);
+
+        Assertions.assertTrue(sumInvoker1.get() <= INVOKE_NUM);
+    }
+
+    private Callable<Boolean> getTask(boolean needShake) {
+        PeakEwmaLoadBalance lb = new PeakEwmaLoadBalance();
+        return () -> {
+            boolean needShakeTemp = needShake;
+            for (int i = 0; i < INVOKE_NUM; i++) {
+                Invoker selected = lb.select(weightInvokersSR, null, weightTestInvocation);
+                RpcStatus rpcStatus = RpcStatus.getStatus(selected.getUrl(), weightTestInvocation.getMethodName());
+
+                if (i > 100 && needShakeTemp && selected.getUrl().getProtocol().equals("test1")) {
+                    //invoker1 shake
+                    needShakeTemp = false;
+                    rpcStatus.beginCount(selected.getUrl(), weightTestInvocation.getMethodName());
+                    TimeUnit.MICROSECONDS.sleep(SHAKE_TIME);
+                    rpcStatus.endCount(selected.getUrl(), weightTestInvocation.getMethodName(), SHAKE_TIME, true);
+                } else {
+                    rpcStatus.beginCount(selected.getUrl(), weightTestInvocation.getMethodName());
+                    long time = ThreadLocalRandom.current().nextLong(5, 10);
+                    TimeUnit.MICROSECONDS.sleep(time);
+                    rpcStatus.endCount(selected.getUrl(), weightTestInvocation.getMethodName(), time, true);
+                }
+
+                if (selected.getUrl().getProtocol().equals("test1")) {
+                    sumInvoker1.incrementAndGet();
+                }
+
+                if (selected.getUrl().getProtocol().equals("test2")) {
+                    sumInvoker2.incrementAndGet();
+                }
+
+                if (selected.getUrl().getProtocol().equals("test5")) {
+                    sumInvoker5.incrementAndGet();
+                }
+            }
+            return true;
+        };
+    }
+}
diff --git a/dubbo-cluster-extensions/pom.xml b/dubbo-cluster-extensions/pom.xml
index f3a3b20..5dc0c1e 100644
--- a/dubbo-cluster-extensions/pom.xml
+++ b/dubbo-cluster-extensions/pom.xml
@@ -30,6 +30,7 @@
     <packaging>pom</packaging>
     <modules>
         <module>dubbo-cluster-broadcast-1</module>
+        <module>dubbo-cluster-loadbalance-peakewma</module>
     </modules>