You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by lg...@apache.org on 2020/05/19 03:38:50 UTC
[incubator-dolphinscheduler] branch dev-1.3.0 updated: master
select worker filter high load worker #2704 (#2733)
This is an automated email from the ASF dual-hosted git repository.
lgcareer pushed a commit to branch dev-1.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev-1.3.0 by this push:
new 2330cc8 master select worker filter high load worker #2704 (#2733)
2330cc8 is described below
commit 2330cc8872ab7852cd91f0f64815c73679d59b9e
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Tue May 19 11:38:41 2020 +0800
master select worker filter high load worker #2704 (#2733)
* add LoggerServerTest UT
* add LoggerServerTest UT
* add LoggerServerTest UT
add RemoveTaskLogRequestCommandTest UT
add RemoveTaskLogResponseCommandTest
* master select worker filter high load worker #2704
* master select worker filter high load worker #2704
* master select worker filter high load worker #2704
* master select worker filter high load worker #2704
* master select worker filter high load worker #2704
* master select worker filter high load worker #2704
Co-authored-by: qiaozhanwei <qi...@analysys.com.cn>
---
.../apache/dolphinscheduler/common/Constants.java | 6 +-
.../dolphinscheduler/common/utils/ResInfo.java | 4 +-
.../dispatch/host/LowerWeightHostManager.java | 25 ++++---
.../server/master/registry/MasterRegistry.java | 31 +++-----
.../server/registry/HeartBeatTask.java | 82 ++++++++++++++++++++++
.../server/worker/config/WorkerConfig.java | 2 +-
.../server/worker/registry/WorkerRegistry.java | 31 +++-----
.../src/main/resources/master.properties | 4 +-
.../src/main/resources/worker.properties | 4 +-
.../server/master/registry/MasterRegistryTest.java | 4 +-
.../server/worker/registry/WorkerRegistryTest.java | 4 +-
11 files changed, 132 insertions(+), 65 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index effa4f0..d958932 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -507,7 +507,7 @@ public final class Constants {
/**
* heartbeat for zk info length
*/
- public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 5;
+ public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 9;
/**
@@ -961,4 +961,8 @@ public final class Constants {
*/
public static final String PLUGIN_JAR_SUFFIX = ".jar";
+ public static final int NORAML_NODE_STATUS = 0;
+ public static final int ABNORMAL_NODE_STATUS = 1;
+
+
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
index feadb68..9c1d880 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
@@ -107,8 +107,8 @@ public class ResInfo {
masterServer.setResInfo(getResInfoJson(Double.parseDouble(masterArray[0]),
Double.parseDouble(masterArray[1]),
Double.parseDouble(masterArray[2])));
- masterServer.setCreateTime(DateUtils.stringToDate(masterArray[3]));
- masterServer.setLastHeartbeatTime(DateUtils.stringToDate(masterArray[4]));
+ masterServer.setCreateTime(DateUtils.stringToDate(masterArray[6]));
+ masterServer.setLastHeartbeatTime(DateUtils.stringToDate(masterArray[7]));
return masterServer;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 99cae69..5989519 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.host;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
@@ -68,7 +69,7 @@ public class LowerWeightHostManager extends CommonHostManager {
/**
* worker host weights
*/
- private ConcurrentHashMap<String, Set<HostWeight>> workerHostWeights;
+ private ConcurrentHashMap<String, Set<HostWeight>> workerHostWeightsMap;
/**
* worker group host lock
@@ -83,7 +84,7 @@ public class LowerWeightHostManager extends CommonHostManager {
@PostConstruct
public void init(){
this.selector = new LowerWeightRoundRobin();
- this.workerHostWeights = new ConcurrentHashMap<>();
+ this.workerHostWeightsMap = new ConcurrentHashMap<>();
this.lock = new ReentrantLock();
this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor"));
this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),35, 40, TimeUnit.SECONDS);
@@ -106,9 +107,8 @@ public class LowerWeightHostManager extends CommonHostManager {
Set<HostWeight> workerHostWeights = getWorkerHostWeights(context.getWorkerGroup());
if(CollectionUtils.isNotEmpty(workerHostWeights)){
return selector.select(workerHostWeights).getHost();
- } else{
- return roundRobinHostManager.select(context);
}
+ return new Host();
}
@Override
@@ -119,8 +119,8 @@ public class LowerWeightHostManager extends CommonHostManager {
private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights){
lock.lock();
try {
- workerHostWeights.clear();
- workerHostWeights.putAll(workerHostWeights);
+ workerHostWeightsMap.clear();
+ workerHostWeightsMap.putAll(workerHostWeights);
} finally {
lock.unlock();
}
@@ -129,7 +129,7 @@ public class LowerWeightHostManager extends CommonHostManager {
private Set<HostWeight> getWorkerHostWeights(String workerGroup){
lock.lock();
try {
- return workerHostWeights.get(workerGroup);
+ return workerHostWeightsMap.get(workerGroup);
} finally {
lock.unlock();
}
@@ -150,8 +150,17 @@ public class LowerWeightHostManager extends CommonHostManager {
Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
for(String node : nodes){
String heartbeat = registryCenter.getZookeeperCachedOperator().get(workerGroupPath + "/" + node);
- if(StringUtils.isNotEmpty(heartbeat) && heartbeat.contains(COMMA) && heartbeat.split(COMMA).length == 5){
+ if(StringUtils.isNotEmpty(heartbeat)
+ && heartbeat.split(COMMA).length == Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
String[] parts = heartbeat.split(COMMA);
+
+ int status = Integer.parseInt(parts[8]);
+ if (status == Constants.ABNORMAL_NODE_STATUS){
+ logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}",
+ Double.parseDouble(parts[3]) , Double.parseDouble(parts[2]));
+ continue;
+ }
+
double cpu = Double.parseDouble(parts[0]);
double memory = Double.parseDouble(parts[1]);
double loadAverage = Double.parseDouble(parts[2]);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
index b658298..de6d3bc 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,7 +96,13 @@ public class MasterRegistry {
}
});
int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();
- this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
+ HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
+ masterConfig.getMasterReservedMemory(),
+ masterConfig.getMasterMaxCpuloadAvg(),
+ getMasterPath(),
+ zookeeperRegistryCenter);
+
+ this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
}
@@ -126,26 +133,4 @@ public class MasterRegistry {
private String getLocalAddress(){
return OSUtils.getHost() + ":" + masterConfig.getListenPort();
}
-
- /**
- * hear beat task
- */
- class HeartBeatTask implements Runnable{
-
- @Override
- public void run() {
- try {
- StringBuilder builder = new StringBuilder(100);
- builder.append(OSUtils.cpuUsage()).append(COMMA);
- builder.append(OSUtils.memoryUsage()).append(COMMA);
- builder.append(OSUtils.loadAverage()).append(COMMA);
- builder.append(startTime).append(COMMA);
- builder.append(DateUtils.dateToString(new Date()));
- String masterPath = getMasterPath();
- zookeeperRegistryCenter.getZookeeperCachedOperator().update(masterPath, builder.toString());
- } catch (Throwable ex){
- logger.error("error write master heartbeat info", ex);
- }
- }
- }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
new file mode 100644
index 0000000..6d0eae9
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.registry;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+
+import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
+
+public class HeartBeatTask extends Thread{
+
+ private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
+
+ private String startTime;
+ private double reservedMemory;
+ private double maxCpuloadAvg;
+ private String heartBeatPath;
+ private ZookeeperRegistryCenter zookeeperRegistryCenter;
+
+ public HeartBeatTask(String startTime,
+ double reservedMemory,
+ double maxCpuloadAvg,
+ String heartBeatPath,
+ ZookeeperRegistryCenter zookeeperRegistryCenter){
+ this.startTime = startTime;
+ this.reservedMemory = reservedMemory;
+ this.maxCpuloadAvg = maxCpuloadAvg;
+ this.heartBeatPath = heartBeatPath;
+ this.zookeeperRegistryCenter = zookeeperRegistryCenter;
+ }
+
+ @Override
+ public void run() {
+ try {
+
+ double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
+ double loadAverage = OSUtils.loadAverage();
+
+ int status = Constants.NORAML_NODE_STATUS;
+
+ if(availablePhysicalMemorySize < reservedMemory
+ || loadAverage > maxCpuloadAvg){
+ logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize , loadAverage);
+ status = Constants.ABNORMAL_NODE_STATUS;
+ }
+
+ StringBuilder builder = new StringBuilder(100);
+ builder.append(OSUtils.cpuUsage()).append(COMMA);
+ builder.append(OSUtils.memoryUsage()).append(COMMA);
+ builder.append(OSUtils.loadAverage()).append(COMMA);
+ builder.append(OSUtils.availablePhysicalMemorySize()).append(Constants.COMMA);
+ builder.append(maxCpuloadAvg).append(Constants.COMMA);
+ builder.append(reservedMemory).append(Constants.COMMA);
+ builder.append(startTime).append(Constants.COMMA);
+ builder.append(DateUtils.dateToString(new Date())).append(Constants.COMMA);
+ builder.append(status);
+ zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, builder.toString());
+ } catch (Throwable ex){
+ logger.error("error write heartbeat info", ex);
+ }
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 7f4d93f..1a31fa0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -38,7 +38,7 @@ public class WorkerConfig {
@Value("${worker.max.cpuload.avg:-1}")
private int workerMaxCpuloadAvg;
- @Value("${worker.reserved.memory:0.5}")
+ @Value("${worker.reserved.memory:0.3}")
private double workerReservedMemory;
@Value("${worker.group: default}")
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index 4d72340..f7093a1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
+import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.slf4j.Logger;
@@ -102,7 +103,13 @@ public class WorkerRegistry {
}
});
int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval();
- this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
+
+ HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
+ workerConfig.getWorkerReservedMemory(),
+ workerConfig.getWorkerMaxCpuloadAvg(),
+ getWorkerPath(),
+ zookeeperRegistryCenter);
+ this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
logger.info("worker node : {} registry to ZK successfully with heartBeatInterval : {}s", address, workerHeartbeatInterval);
}
@@ -143,26 +150,4 @@ public class WorkerRegistry {
private String getLocalAddress(){
return OSUtils.getHost() + ":" + workerConfig.getListenPort();
}
-
- /**
- * hear beat task
- */
- class HeartBeatTask implements Runnable{
-
- @Override
- public void run() {
- try {
- StringBuilder builder = new StringBuilder(100);
- builder.append(OSUtils.cpuUsage()).append(COMMA);
- builder.append(OSUtils.memoryUsage()).append(COMMA);
- builder.append(OSUtils.loadAverage()).append(COMMA);
- builder.append(startTime).append(COMMA);
- builder.append(DateUtils.dateToString(new Date()));
- String workerPath = getWorkerPath();
- zookeeperRegistryCenter.getZookeeperCachedOperator().update(workerPath, builder.toString());
- } catch (Throwable ex){
- logger.error("error write worker heartbeat info", ex);
- }
- }
- }
}
diff --git a/dolphinscheduler-server/src/main/resources/master.properties b/dolphinscheduler-server/src/main/resources/master.properties
index 2f75aa5..f09f469 100644
--- a/dolphinscheduler-server/src/main/resources/master.properties
+++ b/dolphinscheduler-server/src/main/resources/master.properties
@@ -31,8 +31,8 @@
#master.task.commit.interval=1000
-# only less than cpu avg load, master server can work. default value : the number of cpu cores * 2
-#master.max.cpuload.avg=100
+# only less than cpu avg load, master server can work. default value -1 : the number of cpu cores * 2
+#master.max.cpuload.avg=-1
# only larger than reserved memory, master server can work. default value : physical memory * 1/10, unit is G.
#master.reserved.memory=0.3
diff --git a/dolphinscheduler-server/src/main/resources/worker.properties b/dolphinscheduler-server/src/main/resources/worker.properties
index 36bc132..9bbf901 100644
--- a/dolphinscheduler-server/src/main/resources/worker.properties
+++ b/dolphinscheduler-server/src/main/resources/worker.properties
@@ -24,8 +24,8 @@
# submit the number of tasks at a time TODO
#worker.fetch.task.num = 3
-# only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2
-#worker.max.cpuload.avg=100
+# only less than cpu avg load, worker server can work. default value -1: the number of cpu cores * 2
+#worker.max.cpuload.avg= -1
# only larger than reserved memory, worker server can work. default value : physical memory * 1/6, unit is G.
#worker.reserved.memory=0.3
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
index a482029..9d90f20 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
@@ -33,7 +33,7 @@ import org.springframework.test.context.junit4.SpringRunner;
import java.util.List;
import java.util.concurrent.TimeUnit;
-
+import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
/**
* master registry test
*/
@@ -57,7 +57,7 @@ public class MasterRegistryTest {
TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node
String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort());
String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(masterNodePath);
- Assert.assertEquals(5, heartbeat.split(",").length);
+ Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length);
}
@Test
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
index d5f836e..6ecff51 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
@@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+
+import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
/**
* worker registry test
*/
@@ -61,7 +63,7 @@ public class WorkerRegistryTest {
String instancePath = workerPath + "/" + workerConfig.getWorkerGroup().trim() + "/" + (OSUtils.getHost() + ":" + workerConfig.getListenPort());
TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); //wait heartbeat info write into zk node
String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(instancePath);
- Assert.assertEquals(5, heartbeat.split(",").length);
+ Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length);
}
@Test