You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/05/29 08:14:00 UTC

[dubbo-spi-extensions] branch master updated: optimize peakEwmaLoadbalance (#106)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-spi-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a30787  optimize peakEwmaLoadbalance (#106)
1a30787 is described below

commit 1a30787aee2dcab44c39c122120932dc5d0a2786
Author: lemonJ <10...@qq.com>
AuthorDate: Sun May 29 16:13:56 2022 +0800

    optimize peakEwmaLoadbalance (#106)
---
 .../cluster/loadbalance/PeakEwmaLoadBalance.java   | 33 ++++++++++++++--------
 .../org.apache.dubbo.rpc.cluster.LoadBalance       |  1 +
 .../loadbalance/PeakEwmaLoadBalanceTest.java       |  2 ++
 3 files changed, 25 insertions(+), 11 deletions(-)

diff --git a/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/PeakEwmaLoadBalance.java b/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/PeakEwmaLoadBalance.java
index b87ae84..cbd2092 100644
--- a/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/PeakEwmaLoadBalance.java
+++ b/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/PeakEwmaLoadBalance.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.RpcStatus;
 import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ScopeModelAware;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -42,7 +43,7 @@ import java.util.concurrent.locks.ReentrantLock;
  * https://github.com/twitter/finagle/blob/1bc837c4feafc0096e43c0e98516a8e1c50c4421
  * /finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
  */
-public class PeakEwmaLoadBalance extends AbstractLoadBalance {
+public class PeakEwmaLoadBalance extends AbstractLoadBalance implements ScopeModelAware {
 
     public static final String NAME = "peakewma";
 
@@ -50,8 +51,16 @@ public class PeakEwmaLoadBalance extends AbstractLoadBalance {
 
     private static final double PENALTY = Long.MAX_VALUE >> 16;
 
+    //double precision
+    private static final double ZERO_COST = 1E-6;
+
     // The mean lifetime of `cost`, it reaches its half-life after decayTime*ln(2).
-    private static double decayTime = ApplicationModel.getEnvironment().getConfiguration().getInt(PEAK_EWMA_DECAY_TIME, 10_000);
+    private static double decayTime;
+
+    @Override
+    public void setApplicationModel(ApplicationModel applicationModel) {
+        decayTime = applicationModel.getModelEnvironment().getConfiguration().getInt(PEAK_EWMA_DECAY_TIME, 10_000);
+    }
 
     private ConcurrentMap<RpcStatus, Metric> methodMap = new ConcurrentHashMap<>();
 
@@ -64,8 +73,8 @@ public class PeakEwmaLoadBalance extends AbstractLoadBalance {
 
         // calculate running time And active num
         private RpcStatus rpcStatus;
-        private long succeededOffset;
-        private long succeededElapsedOffset;
+        private long invokeOffset;
+        private long invokeElapsedOffset;
 
         //lock for get and set cost
         ReentrantLock ewmaLock = new ReentrantLock();
@@ -74,15 +83,15 @@ public class PeakEwmaLoadBalance extends AbstractLoadBalance {
             this.rpcStatus = rpcStatus;
             this.lastUpdateTime = System.currentTimeMillis();
             this.cost = 0.0;
-            this.succeededOffset = 0;
-            this.succeededElapsedOffset = 0;
+            this.invokeOffset = 0;
+            this.invokeElapsedOffset = 0;
         }
 
         private void observe() {
             double rtt = 0;
-            long succeed = this.rpcStatus.getSucceeded() - this.succeededOffset;
+            long succeed = this.rpcStatus.getSucceeded() - this.invokeOffset;
             if (succeed != 0) {
-                rtt = (this.rpcStatus.getSucceededElapsed() * 1.0 - this.succeededElapsedOffset) / succeed;
+                rtt = (this.rpcStatus.getSucceededElapsed() * 1.0 - this.invokeElapsedOffset) / succeed;
             }
 
             final long currentTime = System.currentTimeMillis();
@@ -95,8 +104,8 @@ public class PeakEwmaLoadBalance extends AbstractLoadBalance {
             }
 
             lastUpdateTime = currentTime;
-            succeededOffset = rpcStatus.getSucceeded();
-            succeededElapsedOffset = rpcStatus.getSucceededElapsed();
+            invokeOffset = rpcStatus.getTotal();
+            invokeElapsedOffset = rpcStatus.getTotalElapsed();
         }
 
         private double getCost() {
@@ -105,8 +114,10 @@ public class PeakEwmaLoadBalance extends AbstractLoadBalance {
             int active = rpcStatus.getActive();
             ewmaLock.unlock();
 
+            double costTemp = cost;
+
             //If we don't have any latency history, we penalize the host on the first probe.
-            return (cost == 0.0 && active != 0) ? PENALTY + active : cost * (active + 1);
+            return (costTemp < ZERO_COST && active != 0) ? PENALTY + active : costTemp * (active + 1);
         }
     }
 
diff --git a/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.LoadBalance b/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.LoadBalance
new file mode 100644
index 0000000..0f23741
--- /dev/null
+++ b/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.LoadBalance
@@ -0,0 +1 @@
+peakewma=org.apache.dubbo.rpc.cluster.loadbalance.PeakEwmaLoadBalance
diff --git a/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/PeakEwmaLoadBalanceTest.java b/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/PeakEwmaLoadBalanceTest.java
index 77b7c8d..a21bcfe 100644
--- a/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/PeakEwmaLoadBalanceTest.java
+++ b/dubbo-cluster-extensions/dubbo-cluster-loadbalance-peakewma/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/PeakEwmaLoadBalanceTest.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.rpc.cluster.loadbalance;
 
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.RpcStatus;
+import org.apache.dubbo.rpc.model.ApplicationModel;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -77,6 +78,7 @@ public class PeakEwmaLoadBalanceTest extends LoadBalanceBaseTest {
 
     private Callable<Boolean> getTask(boolean needShake) {
         PeakEwmaLoadBalance lb = new PeakEwmaLoadBalance();
+        lb.setApplicationModel(ApplicationModel.defaultModel());
         return () -> {
             boolean needShakeTemp = needShake;
             for (int i = 0; i < INVOKE_NUM; i++) {