You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/09/22 09:39:50 UTC
[incubator-dolphinscheduler] branch dev updated:
[Improvement][remote]load balance warm up (#3770)
This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new db663a1 [Improvement][remote]load balance warm up (#3770)
db663a1 is described below
commit db663a13a37c6bc4e47286ebbc8efb2f20efa57b
Author: CalvinKirs <ac...@163.com>
AuthorDate: Tue Sep 22 17:39:39 2020 +0800
[Improvement][remote]load balance warm up (#3770)
* [Improvement][remote]load balance warm up
* reformat code
* reformat code
* code smell
* code smell
* add test
* add test
* add test
* add test
* fix bug
* fix bug
* add docs
* add host test
* add host test
* add host test
* add docs
* code reformat
* code reformat
---
.../dolphinscheduler/remote/utils/Constants.java | 5 ++
.../apache/dolphinscheduler/remote/utils/Host.java | 48 +++++++++++----
.../master/dispatch/host/assign/HostWeight.java | 35 +++++++----
.../server/worker/registry/WorkerRegistry.java | 44 ++++++++------
.../host/assign/LowerWeightRoundRobinTest.java | 51 +++++++++++++---
.../dispatch/host/assign/RandomSelectorTest.java | 16 ++---
.../host/assign/RoundRobinSelectorTest.java | 70 +++++++++++++---------
.../HostTest.java} | 34 ++++++-----
pom.xml | 1 +
9 files changed, 207 insertions(+), 97 deletions(-)
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
index 370467f..91d4ac2 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
@@ -53,4 +53,9 @@ public class Constants {
*/
public static final String OS_NAME = System.getProperty("os.name");
+ /**
+ * warm up time
+ */
+ public static final int WARM_UP_TIME = 10 * 60 * 1000;
+
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
index b905a9f..c18d02f 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.remote.utils;
import java.io.Serializable;
@@ -45,6 +46,11 @@ public class Host implements Serializable {
private int weight;
/**
+ * startTime
+ */
+ private long startTime;
+
+ /**
* workGroup
*/
private String workGroup;
@@ -58,19 +64,21 @@ public class Host implements Serializable {
this.address = ip + ":" + port;
}
- public Host(String ip, int port, int weight) {
+ public Host(String ip, int port, int weight, long startTime) {
this.ip = ip;
this.port = port;
this.address = ip + ":" + port;
- this.weight = weight;
+ this.weight = getWarmUpWeight(weight, startTime);
+ this.startTime = startTime;
}
- public Host(String ip, int port, int weight,String workGroup) {
+ public Host(String ip, int port, int weight, long startTime, String workGroup) {
this.ip = ip;
this.port = port;
this.address = ip + ":" + port;
- this.weight = weight;
- this.workGroup=workGroup;
+ this.weight = getWarmUpWeight(weight, startTime);
+ this.workGroup = workGroup;
+ this.startTime = startTime;
}
public String getAddress() {
@@ -98,6 +106,14 @@ public class Host implements Serializable {
this.weight = weight;
}
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
public int getPort() {
return port;
}
@@ -133,8 +149,8 @@ public class Host implements Serializable {
if (parts.length == 2) {
host = new Host(parts[0], Integer.parseInt(parts[1]));
}
- if (parts.length == 3) {
- host = new Host(parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2]));
+ if (parts.length == 4) {
+ host = new Host(parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2]), Long.parseLong(parts[3]));
}
return host;
}
@@ -169,8 +185,20 @@ public class Host implements Serializable {
@Override
public String toString() {
- return "Host{" +
- "address='" + address + '\'' +
- '}';
+ return "Host{"
+ + "address='" + address + '\''
+ + '}';
+ }
+
+ /**
+ * warm up
+ */
+ private int getWarmUpWeight(int weight, long startTime) {
+ long uptime = System.currentTimeMillis() - startTime;
+ //If the warm-up is not over, reduce the weight
+ if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
+ return (int) (weight * ((float) uptime / Constants.WARM_UP_TIME));
+ }
+ return weight;
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
index ebceea7..298a62a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
+import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.Host;
/**
@@ -37,9 +38,9 @@ public class HostWeight {
private int currentWeight;
public HostWeight(Host host, double cpu, double memory, double loadAverage) {
- this.weight = calculateWeight(cpu, memory, loadAverage);
- this.host = host ;
- this.currentWeight = weight ;
+ this.weight = getWeight(cpu, memory, loadAverage, host);
+ this.host = host;
+ this.currentWeight = weight;
}
public int getCurrentWeight() {
@@ -60,14 +61,28 @@ public class HostWeight {
@Override
public String toString() {
- return "HostWeight{" +
- "host=" + host +
- ", weight=" + weight +
- ", currentWeight=" + currentWeight +
- '}';
+ return "HostWeight{"
+ + "host=" + host
+ + ", weight=" + weight
+ + ", currentWeight=" + currentWeight
+ + '}';
}
- private int calculateWeight(double cpu, double memory, double loadAverage){
- return (int)(cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR);
+ private int getWeight(double cpu, double memory, double loadAverage, Host host) {
+ int calculateWeight = (int) (cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR);
+ return getWarmUpWeight(host, calculateWeight);
+
+ }
+
+ /**
+ * If the warm-up is not over, add the weight
+ */
+ private int getWarmUpWeight(Host host, int weight) {
+ long startTime = host.getStartTime();
+ long uptime = System.currentTimeMillis() - startTime;
+ if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
+ return (int) ((weight * Constants.WARM_UP_TIME) / uptime);
+ }
+ return weight;
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index 36998fa..904ea3a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -14,19 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.registry;
-import java.util.Date;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import static org.apache.dolphinscheduler.common.Constants.COLON;
+import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.SLASH;
-import javax.annotation.PostConstruct;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
@@ -34,6 +28,19 @@ import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -41,8 +48,6 @@ import org.springframework.stereotype.Service;
import com.google.common.collect.Sets;
-import static org.apache.dolphinscheduler.common.Constants.*;
-
/**
* worker registry
@@ -111,10 +116,10 @@ public class WorkerRegistry {
}
HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime,
- this.workerConfig.getWorkerReservedMemory(),
- this.workerConfig.getWorkerMaxCpuloadAvg(),
- workerZkPaths,
- this.zookeeperRegistryCenter);
+ this.workerConfig.getWorkerReservedMemory(),
+ this.workerConfig.getWorkerMaxCpuloadAvg(),
+ workerZkPaths,
+ this.zookeeperRegistryCenter);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval);
@@ -142,6 +147,7 @@ public class WorkerRegistry {
String address = getLocalAddress();
String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath();
String weight = getWorkerWeight();
+ String workerStartTime = COLON + System.currentTimeMillis();
for (String workGroup : this.workerGroups) {
StringBuilder workerZkPathBuilder = new StringBuilder(100);
@@ -153,6 +159,7 @@ public class WorkerRegistry {
workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH);
workerZkPathBuilder.append(address);
workerZkPathBuilder.append(weight);
+ workerZkPathBuilder.append(workerStartTime);
workerZkPaths.add(workerZkPathBuilder.toString());
}
return workerZkPaths;
@@ -162,13 +169,14 @@ public class WorkerRegistry {
* get local address
*/
private String getLocalAddress() {
- return NetUtils.getHost() + ":" + workerConfig.getListenPort();
+ return NetUtils.getHost() + COLON + workerConfig.getListenPort();
}
/**
* get Worker Weight
*/
private String getWorkerWeight() {
- return ":" + workerConfig.getWeight();
+ return COLON + workerConfig.getWeight();
}
+
}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
index fadaa84..fd5dda0 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
@@ -14,9 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.remote.utils.Host;
+
+import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
@@ -27,15 +30,47 @@ public class LowerWeightRoundRobinTest {
@Test
- public void testSelect(){
+ public void testSelect() {
Collection<HostWeight> sources = new ArrayList<>();
- sources.add(new HostWeight(Host.of("192.158.2.1:11"), 0.06, 0.44, 3.84));
- sources.add(new HostWeight(Host.of("192.158.2.1:22"), 0.06, 0.56, 3.24));
- sources.add(new HostWeight(Host.of("192.158.2.1:33"), 0.06, 0.80, 3.15));
- System.out.println(sources);
+ sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" + (System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84));
+ sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.56, 3.24));
+ sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" + (System.currentTimeMillis() - 60 * 2 * 1000)), 0.06, 0.80, 3.15));
+
+ LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
+ HostWeight result;
+ result = roundRobin.select(sources);
+ Assert.assertEquals("192.158.2.1", result.getHost().getIp());
+ result = roundRobin.select(sources);
+ Assert.assertEquals("192.158.2.2", result.getHost().getIp());
+ result = roundRobin.select(sources);
+ Assert.assertEquals("192.158.2.1", result.getHost().getIp());
+ result = roundRobin.select(sources);
+ Assert.assertEquals("192.158.2.2", result.getHost().getIp());
+ result = roundRobin.select(sources);
+ Assert.assertEquals("192.158.2.1", result.getHost().getIp());
+ result = roundRobin.select(sources);
+ Assert.assertEquals("192.158.2.2", result.getHost().getIp());
+ }
+
+ @Test
+ public void testWarmUpSelect() {
+ Collection<HostWeight> sources = new ArrayList<>();
+ sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" + (System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84));
+ sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.44, 3.84));
+ sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" + (System.currentTimeMillis() - 60 * 3 * 1000)), 0.06, 0.44, 3.84));
+ sources.add(new HostWeight(Host.of("192.158.2.4:33:100:" + (System.currentTimeMillis() - 60 * 11 * 1000)), 0.06, 0.44, 3.84));
+
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
- for(int i = 0; i < 100; i ++){
- System.out.println(roundRobin.select(sources));
- }
+ HostWeight result;
+ result = roundRobin.select(sources);
+ Assert.assertEquals("192.158.2.4", result.getHost().getIp());
+ result = roundRobin.select(sources);
+ Assert.assertEquals("192.158.2.1", result.getHost().getIp());
+ result = roundRobin.select(sources);
+ Assert.assertEquals("192.158.2.2", result.getHost().getIp());
+ result = roundRobin.select(sources);
+ Assert.assertEquals("192.158.2.4", result.getHost().getIp());
+ result = roundRobin.select(sources);
+ Assert.assertEquals("192.158.2.1", result.getHost().getIp());
}
}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
index f25a227..14aa7b8 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
@@ -14,11 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
-import org.apache.commons.lang.ObjectUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
+
import org.junit.Assert;
import org.junit.Test;
@@ -31,22 +31,22 @@ import java.util.Collections;
public class RandomSelectorTest {
@Test(expected = IllegalArgumentException.class)
- public void testSelectWithIllegalArgumentException(){
+ public void testSelectWithIllegalArgumentException() {
RandomSelector selector = new RandomSelector();
- selector.select(Collections.EMPTY_LIST);
+ selector.select(null);
}
@Test
- public void testSelect1(){
+ public void testSelect1() {
RandomSelector selector = new RandomSelector();
- Host result = selector.select(Arrays.asList(new Host("192.168.1.1",80,100),new Host("192.168.1.2",80,20)));
+ Host result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 100, System.currentTimeMillis()), new Host("192.168.1.2", 80, 20, System.currentTimeMillis())));
Assert.assertNotNull(result);
}
@Test
- public void testSelect(){
+ public void testSelect() {
RandomSelector selector = new RandomSelector();
- Host result = selector.select(Arrays.asList(new Host("192.168.1.1",80,100),new Host("192.168.1.1",80,20)));
+ Host result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 100, System.currentTimeMillis()), new Host("192.168.1.1", 80, 20, System.currentTimeMillis())));
Assert.assertNotNull(result);
}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
index ed62caa..9e41cd6 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
@@ -14,16 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
+
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
/**
* round robin selector
@@ -33,43 +33,59 @@ public class RoundRobinSelectorTest {
@Test(expected = IllegalArgumentException.class)
public void testSelectWithIllegalArgumentException() {
RoundRobinSelector selector = new RoundRobinSelector();
- selector.select(Collections.EMPTY_LIST);
+ selector.select(null);
}
@Test
public void testSelect1() {
RoundRobinSelector selector = new RoundRobinSelector();
- Host result = null;
- result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
+ Host result;
+ result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp());
- result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
+ result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.2", result.getIp());
- result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
+ result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp());
// add new host
- result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
+ result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
+ Assert.assertEquals("192.168.1.1", result.getIp());
+ result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
+ Assert.assertEquals("192.168.1.2", result.getIp());
+ result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
+ new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp());
- result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
+ result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
+ new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
+ Assert.assertEquals("192.168.1.3", result.getIp());
+ result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
+ new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
+ Assert.assertEquals("192.168.1.1", result.getIp());
+ result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
+ new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.2", result.getIp());
- result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris")));
- Assert.assertEquals("192.168.1.1",result.getIp());
- result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris")));
- Assert.assertEquals("192.168.1.3",result.getIp());
- result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris")));
- Assert.assertEquals("192.168.1.1",result.getIp());
- result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris")));
- Assert.assertEquals("192.168.1.2",result.getIp());
- result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris")));
- Assert.assertEquals("192.168.1.1",result.getIp());
- result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris")));
- Assert.assertEquals("192.168.1.3",result.getIp());
+ result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
+ new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
+ Assert.assertEquals("192.168.1.1", result.getIp());
+ result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
+ new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
+ Assert.assertEquals("192.168.1.3", result.getIp());
// remove host3
- result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
- Assert.assertEquals("192.168.1.1",result.getIp());
- result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
- Assert.assertEquals("192.168.1.2",result.getIp());
- result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris")));
- Assert.assertEquals("192.168.1.1",result.getIp());
+ result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
+ Assert.assertEquals("192.168.1.1", result.getIp());
+ result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
+ Assert.assertEquals("192.168.1.2", result.getIp());
+ result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
+ Assert.assertEquals("192.168.1.1", result.getIp());
+
+ }
+
+ @Test
+ public void testWarmUpRoundRobinSelector() {
+ RoundRobinSelector selector = new RoundRobinSelector();
+ Host result;
+ result = selector.select(
+ Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis() - 60 * 1000 * 2, "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis() - 60 * 1000 * 10, "kris")));
+ Assert.assertEquals("192.168.1.2", result.getIp());
}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
similarity index 55%
copy from dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
copy to dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
index fadaa84..6273569 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
@@ -14,28 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
-import org.apache.dolphinscheduler.remote.utils.Host;
-import org.junit.Test;
+package org.apache.dolphinscheduler.server.utils;
-import java.util.ArrayList;
-import java.util.Collection;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.junit.Assert;
+import org.junit.Test;
-public class LowerWeightRoundRobinTest {
+/**
+ * host test
+ */
+public class HostTest {
+ @Test
+ public void testHostWarmUp() {
+ Host host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)));
+ Assert.assertEquals(50, host.getWeight());
+ host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 10 * 1000)));
+ Assert.assertEquals(100, host.getWeight());
+ }
@Test
- public void testSelect(){
- Collection<HostWeight> sources = new ArrayList<>();
- sources.add(new HostWeight(Host.of("192.158.2.1:11"), 0.06, 0.44, 3.84));
- sources.add(new HostWeight(Host.of("192.158.2.1:22"), 0.06, 0.56, 3.24));
- sources.add(new HostWeight(Host.of("192.158.2.1:33"), 0.06, 0.80, 3.15));
- System.out.println(sources);
- LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
- for(int i = 0; i < 100; i ++){
- System.out.println(roundRobin.select(sources));
- }
+ public void testHost() {
+ Host host = Host.of("192.158.2.2:22");
+ Assert.assertEquals(22, host.getPort());
}
}
diff --git a/pom.xml b/pom.xml
index 207518c..c8cb5a9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -832,6 +832,7 @@
<include>**/server/register/ZookeeperNodeManagerTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include>
<include>**/server/utils/ExecutionContextTestUtils.java</include>
+ <include>**/server/utils/HostTest.java</include>
<!--<include>**/server/utils/FlinkArgsUtilsTest.java</include>-->
<include>**/server/utils/LogUtilsTest.java</include>
<include>**/server/utils/ParamUtilsTest.java</include>