You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/09/22 09:39:50 UTC

[incubator-dolphinscheduler] branch dev updated: [Improvement][remote]load balance warm up (#3770)

This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new db663a1  [Improvement][remote]load balance warm up  (#3770)
db663a1 is described below

commit db663a13a37c6bc4e47286ebbc8efb2f20efa57b
Author: CalvinKirs <ac...@163.com>
AuthorDate: Tue Sep 22 17:39:39 2020 +0800

    [Improvement][remote]load balance warm up  (#3770)
    
    * [Improvement][remote]load balance  warm up
    
    * reformat code
    
    * reformat code
    
    * code smell
    
    * code smell
    
    * add test
    
    * add test
    
    * add test
    
    * add test
    
    * fix bug
    
    * fix bug
    
    * add docs
    
    * add host test
    
    * add host test
    
    * add host test
    
    * add docs
    
    * code reformat
    
    * code reformat
---
 .../dolphinscheduler/remote/utils/Constants.java   |  5 ++
 .../apache/dolphinscheduler/remote/utils/Host.java | 48 +++++++++++----
 .../master/dispatch/host/assign/HostWeight.java    | 35 +++++++----
 .../server/worker/registry/WorkerRegistry.java     | 44 ++++++++------
 .../host/assign/LowerWeightRoundRobinTest.java     | 51 +++++++++++++---
 .../dispatch/host/assign/RandomSelectorTest.java   | 16 ++---
 .../host/assign/RoundRobinSelectorTest.java        | 70 +++++++++++++---------
 .../HostTest.java}                                 | 34 ++++++-----
 pom.xml                                            |  1 +
 9 files changed, 207 insertions(+), 97 deletions(-)

diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
index 370467f..91d4ac2 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
@@ -53,4 +53,9 @@ public class Constants {
      */
     public static final String OS_NAME = System.getProperty("os.name");
 
+    /**
+     * warm up time
+     */
+    public static final int WARM_UP_TIME = 10 * 60 * 1000;
+
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
index b905a9f..c18d02f 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.remote.utils;
 
 import java.io.Serializable;
@@ -45,6 +46,11 @@ public class Host implements Serializable {
     private int weight;
 
     /**
+     * startTime
+     */
+    private long startTime;
+
+    /**
      * workGroup
      */
     private String workGroup;
@@ -58,19 +64,21 @@ public class Host implements Serializable {
         this.address = ip + ":" + port;
     }
 
-    public Host(String ip, int port, int weight) {
+    public Host(String ip, int port, int weight, long startTime) {
         this.ip = ip;
         this.port = port;
         this.address = ip + ":" + port;
-        this.weight = weight;
+        this.weight = getWarmUpWeight(weight, startTime);
+        this.startTime = startTime;
     }
 
-    public Host(String ip, int port, int weight,String workGroup) {
+    public Host(String ip, int port, int weight, long startTime, String workGroup) {
         this.ip = ip;
         this.port = port;
         this.address = ip + ":" + port;
-        this.weight = weight;
-        this.workGroup=workGroup;
+        this.weight = getWarmUpWeight(weight, startTime);
+        this.workGroup = workGroup;
+        this.startTime = startTime;
     }
 
     public String getAddress() {
@@ -98,6 +106,14 @@ public class Host implements Serializable {
         this.weight = weight;
     }
 
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
     public int getPort() {
         return port;
     }
@@ -133,8 +149,8 @@ public class Host implements Serializable {
         if (parts.length == 2) {
             host = new Host(parts[0], Integer.parseInt(parts[1]));
         }
-        if (parts.length == 3) {
-            host = new Host(parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2]));
+        if (parts.length == 4) {
+            host = new Host(parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2]), Long.parseLong(parts[3]));
         }
         return host;
     }
@@ -169,8 +185,20 @@ public class Host implements Serializable {
 
     @Override
     public String toString() {
-        return "Host{" +
-                "address='" + address + '\'' +
-                '}';
+        return "Host{"
+            + "address='" + address + '\''
+            + '}';
+    }
+
+    /**
+     * warm up
+     */
+    private int getWarmUpWeight(int weight, long startTime) {
+        long uptime = System.currentTimeMillis() - startTime;
+        //If the warm-up is not over, reduce the weight
+        if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
+            return (int) (weight * ((float) uptime / Constants.WARM_UP_TIME));
+        }
+        return weight;
     }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
index ebceea7..298a62a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 
+import org.apache.dolphinscheduler.remote.utils.Constants;
 import org.apache.dolphinscheduler.remote.utils.Host;
 
 /**
@@ -37,9 +38,9 @@ public class HostWeight {
     private int currentWeight;
 
     public HostWeight(Host host, double cpu, double memory, double loadAverage) {
-        this.weight = calculateWeight(cpu, memory, loadAverage);
-        this.host = host ;
-        this.currentWeight = weight ;
+        this.weight = getWeight(cpu, memory, loadAverage, host);
+        this.host = host;
+        this.currentWeight = weight;
     }
 
     public int getCurrentWeight() {
@@ -60,14 +61,28 @@ public class HostWeight {
 
     @Override
     public String toString() {
-        return "HostWeight{" +
-                "host=" + host +
-                ", weight=" + weight +
-                ", currentWeight=" + currentWeight +
-                '}';
+        return "HostWeight{"
+            + "host=" + host
+            + ", weight=" + weight
+            + ", currentWeight=" + currentWeight
+            + '}';
     }
 
-    private int calculateWeight(double cpu, double memory, double loadAverage){
-        return (int)(cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR);
+    private int getWeight(double cpu, double memory, double loadAverage, Host host) {
+        int calculateWeight = (int) (cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR);
+        return getWarmUpWeight(host, calculateWeight);
+
+    }
+
+    /**
+     * If the warm-up is not over, add the weight
+     */
+    private int getWarmUpWeight(Host host, int weight) {
+        long startTime = host.getStartTime();
+        long uptime = System.currentTimeMillis() - startTime;
+        if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
+            return (int) ((weight * Constants.WARM_UP_TIME) / uptime);
+        }
+        return weight;
     }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index 36998fa..904ea3a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -14,19 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.registry;
 
-import java.util.Date;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import static org.apache.dolphinscheduler.common.Constants.COLON;
+import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.SLASH;
 
-import javax.annotation.PostConstruct;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
@@ -34,6 +28,19 @@ import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
 import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -41,8 +48,6 @@ import org.springframework.stereotype.Service;
 
 import com.google.common.collect.Sets;
 
-import static org.apache.dolphinscheduler.common.Constants.*;
-
 
 /**
  * worker registry
@@ -111,10 +116,10 @@ public class WorkerRegistry {
         }
 
         HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime,
-                this.workerConfig.getWorkerReservedMemory(),
-                this.workerConfig.getWorkerMaxCpuloadAvg(),
-                workerZkPaths,
-                this.zookeeperRegistryCenter);
+            this.workerConfig.getWorkerReservedMemory(),
+            this.workerConfig.getWorkerMaxCpuloadAvg(),
+            workerZkPaths,
+            this.zookeeperRegistryCenter);
 
         this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
         logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval);
@@ -142,6 +147,7 @@ public class WorkerRegistry {
         String address = getLocalAddress();
         String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath();
         String weight = getWorkerWeight();
+        String workerStartTime = COLON + System.currentTimeMillis();
 
         for (String workGroup : this.workerGroups) {
             StringBuilder workerZkPathBuilder = new StringBuilder(100);
@@ -153,6 +159,7 @@ public class WorkerRegistry {
             workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH);
             workerZkPathBuilder.append(address);
             workerZkPathBuilder.append(weight);
+            workerZkPathBuilder.append(workerStartTime);
             workerZkPaths.add(workerZkPathBuilder.toString());
         }
         return workerZkPaths;
@@ -162,13 +169,14 @@ public class WorkerRegistry {
      * get local address
      */
     private String getLocalAddress() {
-        return NetUtils.getHost() + ":" + workerConfig.getListenPort();
+        return NetUtils.getHost() + COLON + workerConfig.getListenPort();
     }
 
     /**
      * get Worker Weight
      */
     private String getWorkerWeight() {
-        return ":" + workerConfig.getWeight();
+        return COLON + workerConfig.getWeight();
     }
+
 }
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
index fadaa84..fd5dda0 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
@@ -14,9 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 
 import org.apache.dolphinscheduler.remote.utils.Host;
+
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -27,15 +30,47 @@ public class LowerWeightRoundRobinTest {
 
 
     @Test
-    public void testSelect(){
+    public void testSelect() {
         Collection<HostWeight> sources = new ArrayList<>();
-        sources.add(new HostWeight(Host.of("192.158.2.1:11"), 0.06, 0.44, 3.84));
-        sources.add(new HostWeight(Host.of("192.158.2.1:22"), 0.06, 0.56, 3.24));
-        sources.add(new HostWeight(Host.of("192.158.2.1:33"), 0.06, 0.80, 3.15));
-        System.out.println(sources);
+        sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" + (System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84));
+        sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.56, 3.24));
+        sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" + (System.currentTimeMillis() - 60 * 2 * 1000)), 0.06, 0.80, 3.15));
+
+        LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
+        HostWeight result;
+        result = roundRobin.select(sources);
+        Assert.assertEquals("192.158.2.1", result.getHost().getIp());
+        result = roundRobin.select(sources);
+        Assert.assertEquals("192.158.2.2", result.getHost().getIp());
+        result = roundRobin.select(sources);
+        Assert.assertEquals("192.158.2.1", result.getHost().getIp());
+        result = roundRobin.select(sources);
+        Assert.assertEquals("192.158.2.2", result.getHost().getIp());
+        result = roundRobin.select(sources);
+        Assert.assertEquals("192.158.2.1", result.getHost().getIp());
+        result = roundRobin.select(sources);
+        Assert.assertEquals("192.158.2.2", result.getHost().getIp());
+    }
+
+    @Test
+    public void testWarmUpSelect() {
+        Collection<HostWeight> sources = new ArrayList<>();
+        sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" + (System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84));
+        sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.44, 3.84));
+        sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" + (System.currentTimeMillis() - 60 * 3 * 1000)), 0.06, 0.44, 3.84));
+        sources.add(new HostWeight(Host.of("192.158.2.4:33:100:" + (System.currentTimeMillis() - 60 * 11 * 1000)), 0.06, 0.44, 3.84));
+
         LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
-        for(int i = 0; i < 100; i ++){
-            System.out.println(roundRobin.select(sources));
-        }
+        HostWeight result;
+        result = roundRobin.select(sources);
+        Assert.assertEquals("192.158.2.4", result.getHost().getIp());
+        result = roundRobin.select(sources);
+        Assert.assertEquals("192.158.2.1", result.getHost().getIp());
+        result = roundRobin.select(sources);
+        Assert.assertEquals("192.158.2.2", result.getHost().getIp());
+        result = roundRobin.select(sources);
+        Assert.assertEquals("192.158.2.4", result.getHost().getIp());
+        result = roundRobin.select(sources);
+        Assert.assertEquals("192.158.2.1", result.getHost().getIp());
     }
 }
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
index f25a227..14aa7b8 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
@@ -14,11 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 
-import org.apache.commons.lang.ObjectUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.remote.utils.Host;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -31,22 +31,22 @@ import java.util.Collections;
 public class RandomSelectorTest {
 
     @Test(expected = IllegalArgumentException.class)
-    public void testSelectWithIllegalArgumentException(){
+    public void testSelectWithIllegalArgumentException() {
         RandomSelector selector = new RandomSelector();
-        selector.select(Collections.EMPTY_LIST);
+        selector.select(null);
     }
 
     @Test
-    public void testSelect1(){
+    public void testSelect1() {
         RandomSelector selector = new RandomSelector();
-        Host result = selector.select(Arrays.asList(new Host("192.168.1.1",80,100),new Host("192.168.1.2",80,20)));
+        Host result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 100, System.currentTimeMillis()), new Host("192.168.1.2", 80, 20, System.currentTimeMillis())));
         Assert.assertNotNull(result);
     }
 
     @Test
-    public void testSelect(){
+    public void testSelect() {
         RandomSelector selector = new RandomSelector();
-        Host result = selector.select(Arrays.asList(new Host("192.168.1.1",80,100),new Host("192.168.1.1",80,20)));
+        Host result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 100, System.currentTimeMillis()), new Host("192.168.1.1", 80, 20, System.currentTimeMillis())));
         Assert.assertNotNull(result);
 
     }
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
index ed62caa..9e41cd6 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
@@ -14,16 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 
-import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.remote.utils.Host;
+
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
 
 /**
  * round robin selector
@@ -33,43 +33,59 @@ public class RoundRobinSelectorTest {
     @Test(expected = IllegalArgumentException.class)
     public void testSelectWithIllegalArgumentException() {
         RoundRobinSelector selector = new RoundRobinSelector();
-        selector.select(Collections.EMPTY_LIST);
+        selector.select(null);
     }
 
     @Test
     public void testSelect1() {
         RoundRobinSelector selector = new RoundRobinSelector();
-        Host result = null;
-        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
+        Host result;
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
         Assert.assertEquals("192.168.1.1", result.getIp());
-        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
         Assert.assertEquals("192.168.1.2", result.getIp());
-        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
         Assert.assertEquals("192.168.1.1", result.getIp());
         // add new host
-        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
+        Assert.assertEquals("192.168.1.1", result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
+        Assert.assertEquals("192.168.1.2", result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
+            new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
         Assert.assertEquals("192.168.1.1", result.getIp());
-        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
+            new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
+        Assert.assertEquals("192.168.1.3", result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
+            new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
+        Assert.assertEquals("192.168.1.1", result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
+            new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
         Assert.assertEquals("192.168.1.2", result.getIp());
-        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris")));
-        Assert.assertEquals("192.168.1.1",result.getIp());
-        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris")));
-        Assert.assertEquals("192.168.1.3",result.getIp());
-        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris")));
-        Assert.assertEquals("192.168.1.1",result.getIp());
-        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris")));
-        Assert.assertEquals("192.168.1.2",result.getIp());
-        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris")));
-        Assert.assertEquals("192.168.1.1",result.getIp());
-        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris")));
-        Assert.assertEquals("192.168.1.3",result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
+            new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
+        Assert.assertEquals("192.168.1.1", result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
+            new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
+        Assert.assertEquals("192.168.1.3", result.getIp());
         // remove host3
-        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
-        Assert.assertEquals("192.168.1.1",result.getIp());
-        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
-        Assert.assertEquals("192.168.1.2",result.getIp());
-        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
-        Assert.assertEquals("192.168.1.1",result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
+        Assert.assertEquals("192.168.1.1", result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
+        Assert.assertEquals("192.168.1.2", result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
+        Assert.assertEquals("192.168.1.1", result.getIp());
+
+    }
+
+    @Test
+    public void testWarmUpRoundRobinSelector() {
+        RoundRobinSelector selector = new RoundRobinSelector();
+        Host result;
+        result = selector.select(
+            Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis() - 60 * 1000 * 2, "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis() - 60 * 1000 * 10, "kris")));
+        Assert.assertEquals("192.168.1.2", result.getIp());
 
     }
 
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
similarity index 55%
copy from dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
copy to dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
index fadaa84..6273569 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
@@ -14,28 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 
-import org.apache.dolphinscheduler.remote.utils.Host;
-import org.junit.Test;
+package org.apache.dolphinscheduler.server.utils;
 
-import java.util.ArrayList;
-import java.util.Collection;
+import org.apache.dolphinscheduler.remote.utils.Host;
 
+import org.junit.Assert;
+import org.junit.Test;
 
-public class LowerWeightRoundRobinTest {
+/**
+ * host test
+ */
+public class HostTest {
 
+    @Test
+    public void testHostWarmUp() {
+        Host host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)));
+        Assert.assertEquals(50, host.getWeight());
+        host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 10 * 1000)));
+        Assert.assertEquals(100, host.getWeight());
+    }
 
     @Test
-    public void testSelect(){
-        Collection<HostWeight> sources = new ArrayList<>();
-        sources.add(new HostWeight(Host.of("192.158.2.1:11"), 0.06, 0.44, 3.84));
-        sources.add(new HostWeight(Host.of("192.158.2.1:22"), 0.06, 0.56, 3.24));
-        sources.add(new HostWeight(Host.of("192.158.2.1:33"), 0.06, 0.80, 3.15));
-        System.out.println(sources);
-        LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
-        for(int i = 0; i < 100; i ++){
-            System.out.println(roundRobin.select(sources));
-        }
+    public void testHost() {
+        Host host = Host.of("192.158.2.2:22");
+        Assert.assertEquals(22, host.getPort());
     }
 }
diff --git a/pom.xml b/pom.xml
index 207518c..c8cb5a9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -832,6 +832,7 @@
                         <include>**/server/register/ZookeeperNodeManagerTest.java</include>
                         <include>**/server/utils/DataxUtilsTest.java</include>
                         <include>**/server/utils/ExecutionContextTestUtils.java</include>
+                        <include>**/server/utils/HostTest.java</include>
                         <!--<include>**/server/utils/FlinkArgsUtilsTest.java</include>-->
                         <include>**/server/utils/LogUtilsTest.java</include>
                         <include>**/server/utils/ParamUtilsTest.java</include>