You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/08/31 08:20:34 UTC
[dolphinscheduler] branch dev updated: Refactor heart beat task, use json to serialize/deserialize (#11702)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 67e7f88d8b Refactor heart beat task, use json to serialize/deserialize (#11702)
67e7f88d8b is described below
commit 67e7f88d8b7f65696d36fcb3c85c2d0b7836cae8
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Aug 31 16:20:23 2022 +0800
Refactor heart beat task, use json to serialize/deserialize (#11702)
* Refactor heart beat task, use json to serialize/deserialize
---
.../api/service/impl/WorkerGroupServiceImpl.java | 10 +-
.../common/model/BaseHeartBeatTask.java | 81 +++++++
.../dolphinscheduler/common/model/HeartBeat.java | 19 +-
.../common/model/MasterHeartBeat.java | 35 +--
.../dolphinscheduler/common/model/Server.java | 92 +-------
.../common/model/WorkerHeartBeat.java | 39 +--
.../dolphinscheduler/common/utils/HeartBeat.java | 261 ---------------------
.../dolphinscheduler/common/utils/JSONUtils.java | 46 ++--
.../common/utils/HeartBeatTest.java | 77 ------
.../server/master/config/MasterConfig.java | 7 +-
.../master/dispatch/host/CommonHostManager.java | 24 +-
.../dispatch/host/LowerWeightHostManager.java | 39 ++-
.../master/registry/MasterHeartBeatTask.java | 70 ------
.../master/registry/MasterRegistryClient.java | 56 ++---
.../server/master/registry/ServerNodeManager.java | 160 +++++--------
.../master/registry/WorkerInfoChangeListener.java | 4 +-
.../server/master/task/MasterHeartBeatTask.java | 71 ++++++
.../dispatch/host/RoundRobinHostManagerTest.java | 2 +
.../master/registry/MasterRegistryClientTest.java | 8 +
.../service/registry/RegistryClient.java | 36 ++-
.../server/worker/WorkerServer.java | 4 +
.../server/worker/config/WorkerConfig.java | 18 ++
.../worker/registry/WorkerHeartBeatTask.java | 79 -------
.../worker/registry/WorkerRegistryClient.java | 110 ++-------
.../server/worker/task/WorkerHeartBeatTask.java | 107 +++++++++
25 files changed, 522 insertions(+), 933 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index eee031942c..732ecd8e7b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -28,7 +28,9 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.UserType;
-import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.common.model.HeartBeat;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
@@ -318,9 +320,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
wg.setName(workerGroup);
if (isPaging) {
String registeredValue = registryClient.get(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.iterator().next());
- HeartBeat heartBeat = HeartBeat.decodeHeartBeat(registeredValue);
- wg.setCreateTime(new Date(heartBeat.getStartupTime()));
- wg.setUpdateTime(new Date(heartBeat.getReportTime()));
+ WorkerHeartBeat workerHeartBeat = JSONUtils.parseObject(registeredValue, WorkerHeartBeat.class);
+ wg.setCreateTime(new Date(workerHeartBeat.getStartupTime()));
+ wg.setUpdateTime(new Date(workerHeartBeat.getReportTime()));
wg.setSystemDefault(true);
if (workerGroupsMap != null && workerGroupsMap.containsKey(workerGroup)) {
wg.setDescription(workerGroupsMap.get(workerGroup).getDescription());
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java
new file mode 100644
index 0000000000..557f7ec86b
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.model;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+
+@Slf4j
+public abstract class BaseHeartBeatTask<T> extends BaseDaemonThread {
+
+ private final String threadName;
+ private final long heartBeatInterval;
+
+ protected boolean runningFlag;
+
+ public BaseHeartBeatTask(String threadName, long heartBeatInterval) {
+ super(threadName);
+ this.threadName = threadName;
+ this.heartBeatInterval = heartBeatInterval;
+ this.runningFlag = true;
+ }
+
+ @Override
+ public synchronized void start() {
+ log.info("Starting {}", threadName);
+ super.start();
+ log.info("Started {}, heartBeatInterval: {}", threadName, heartBeatInterval);
+ }
+
+ @Override
+ public void run() {
+ while (runningFlag) {
+ try {
+ if (!ServerLifeCycleManager.isRunning()) {
+ log.info("The current server status is {}, will not write heartBeatInfo into registry", ServerLifeCycleManager.getServerStatus());
+ continue;
+ }
+ T heartBeat = getHeartBeat();
+ writeHeartBeat(heartBeat);
+ } catch (Exception ex) {
+ log.error("{} task execute failed", threadName, ex);
+ } finally {
+ try {
+ Thread.sleep(heartBeatInterval);
+ } catch (InterruptedException e) {
+ handleInterruptException(e);
+ }
+ }
+ }
+ }
+
+ public void shutdown() {
+ log.warn("{} task finished", threadName);
+ runningFlag = false;
+ }
+
+ private void handleInterruptException(InterruptedException ex) {
+ log.warn("{} has been interrupted", threadName, ex);
+ Thread.currentThread().interrupt();
+ }
+
+ public abstract T getHeartBeat();
+
+ public abstract void writeHeartBeat(T heartBeat);
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java
similarity index 57%
copy from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
copy to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java
index f885a6fba0..1a1d1610e4 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java
@@ -15,22 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.registry;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * The listener used in {@link ServerNodeManager} to notify the change of worker info.
- */
-public interface WorkerInfoChangeListener {
-
- /**
- * Used to notify the change of worker info.
- *
- * @param workerGroups worker groups map, key is worker group name, value is worker address.
- * @param workerNodeInfo worker node info map, key is worker address, value is worker info.
- */
- void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo);
+package org.apache.dolphinscheduler.common.model;
+public interface HeartBeat {
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
similarity index 57%
copy from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
copy to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
index f885a6fba0..95ece3522e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
@@ -15,22 +15,25 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.registry;
+package org.apache.dolphinscheduler.common.model;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * The listener used in {@link ServerNodeManager} to notify the change of worker info.
- */
-public interface WorkerInfoChangeListener {
-
- /**
- * Used to notify the change of worker info.
- *
- * @param workerGroups worker groups map, key is worker group name, value is worker address.
- * @param workerNodeInfo worker node info map, key is worker address, value is worker info.
- */
- void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo);
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class MasterHeartBeat implements HeartBeat {
+ private long startupTime;
+ private long reportTime;
+ private double cpuUsage;
+ private double memoryUsage;
+ private double loadAverage;
+ private double availablePhysicalMemorySize;
+ private double maxCpuloadAvg;
+ private double reservedMemory;
+ private int processId;
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java
index 4bd4648e93..743979eed7 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java
@@ -17,31 +17,19 @@
package org.apache.dolphinscheduler.common.model;
+import lombok.Data;
+
import java.util.Date;
-/**
- * server
- */
+@Data
public class Server {
- /**
- * id
- */
private int id;
- /**
- * host
- */
private String host;
- /**
- * port
- */
private int port;
- /**
- * master directory in zookeeper
- */
private String zkDirectory;
/**
@@ -49,82 +37,8 @@ public class Server {
*/
private String resInfo;
- /**
- * create time
- */
private Date createTime;
- /**
- * laster heart beat time
- */
private Date lastHeartbeatTime;
- public int getId() {
- return id;
- }
-
- public void setId(int id) {
- this.id = id;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public int getPort() {
- return port;
- }
-
- public void setPort(int port) {
- this.port = port;
- }
-
- public Date getCreateTime() {
- return createTime;
- }
-
- public void setCreateTime(Date createTime) {
- this.createTime = createTime;
- }
-
- public String getZkDirectory() {
- return zkDirectory;
- }
-
- public void setZkDirectory(String zkDirectory) {
- this.zkDirectory = zkDirectory;
- }
-
- public Date getLastHeartbeatTime() {
- return lastHeartbeatTime;
- }
-
- public void setLastHeartbeatTime(Date lastHeartbeatTime) {
- this.lastHeartbeatTime = lastHeartbeatTime;
- }
-
- public String getResInfo() {
- return resInfo;
- }
-
- public void setResInfo(String resInfo) {
- this.resInfo = resInfo;
- }
-
- @Override
- public String toString() {
- return "MasterServer{" +
- "id=" + id +
- ", host='" + host + '\'' +
- ", port=" + port +
- ", zkDirectory='" + zkDirectory + '\'' +
- ", resInfo='" + resInfo + '\'' +
- ", createTime=" + createTime +
- ", lastHeartbeatTime=" + lastHeartbeatTime +
- '}';
- }
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
similarity index 50%
copy from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
copy to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
index f885a6fba0..4bb765d180 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
@@ -15,22 +15,33 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.registry;
+package org.apache.dolphinscheduler.common.model;
-import java.util.Map;
-import java.util.Set;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
-/**
- * The listener used in {@link ServerNodeManager} to notify the change of worker info.
- */
-public interface WorkerInfoChangeListener {
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkerHeartBeat implements HeartBeat {
+
+ private long startupTime;
+ private long reportTime;
+ private double cpuUsage;
+ private double memoryUsage;
+ private double loadAverage;
+ private double availablePhysicalMemorySize;
+ private double maxCpuloadAvg;
+ private double reservedMemory;
+ private int serverStatus;
+ private int processId;
+
+ private int workerHostWeight; // worker host weight
+ private int workerWaitingTaskCount; // worker waiting task count
+ private int workerExecThreadCount; // worker thread pool thread count
- /**
- * Used to notify the change of worker info.
- *
- * @param workerGroups worker groups map, key is worker group name, value is worker address.
- * @param workerNodeInfo worker node info map, key is worker address, value is worker info.
- */
- void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo);
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
deleted file mode 100644
index ecfa814cc6..0000000000
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.common.utils;
-
-import org.apache.dolphinscheduler.common.Constants;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HeartBeat {
-
- private static final Logger logger = LoggerFactory.getLogger(HeartBeat.class);
-
- private long startupTime;
- private long reportTime;
- private double cpuUsage;
- private double memoryUsage;
- private double loadAverage;
- private double availablePhysicalMemorySize;
- private double maxCpuloadAvg;
- private double reservedMemory;
- private int serverStatus;
- private int processId;
-
- private int workerHostWeight; // worker host weight
- private int workerWaitingTaskCount; // worker waiting task count
- private int workerExecThreadCount; // worker thread pool thread count
-
- private double diskAvailable;
-
- public double getDiskAvailable() {
- return diskAvailable;
- }
-
- public void setDiskAvailable(double diskAvailable) {
- this.diskAvailable = diskAvailable;
- }
-
- public long getStartupTime() {
- return startupTime;
- }
-
- public void setStartupTime(long startupTime) {
- this.startupTime = startupTime;
- }
-
- public long getReportTime() {
- return reportTime;
- }
-
- public void setReportTime(long reportTime) {
- this.reportTime = reportTime;
- }
-
- public double getCpuUsage() {
- return cpuUsage;
- }
-
- public void setCpuUsage(double cpuUsage) {
- this.cpuUsage = cpuUsage;
- }
-
- public double getMemoryUsage() {
- return memoryUsage;
- }
-
- public void setMemoryUsage(double memoryUsage) {
- this.memoryUsage = memoryUsage;
- }
-
- public double getLoadAverage() {
- return loadAverage;
- }
-
- public void setLoadAverage(double loadAverage) {
- this.loadAverage = loadAverage;
- }
-
- public double getAvailablePhysicalMemorySize() {
- return availablePhysicalMemorySize;
- }
-
- public void setAvailablePhysicalMemorySize(double availablePhysicalMemorySize) {
- this.availablePhysicalMemorySize = availablePhysicalMemorySize;
- }
-
- public double getMaxCpuloadAvg() {
- return maxCpuloadAvg;
- }
-
- public void setMaxCpuloadAvg(double maxCpuloadAvg) {
- this.maxCpuloadAvg = maxCpuloadAvg;
- }
-
- public double getReservedMemory() {
- return reservedMemory;
- }
-
- public void setReservedMemory(double reservedMemory) {
- this.reservedMemory = reservedMemory;
- }
-
- public int getServerStatus() {
- return serverStatus;
- }
-
- public void setServerStatus(int serverStatus) {
- this.serverStatus = serverStatus;
- }
-
- public int getProcessId() {
- return processId;
- }
-
- public void setProcessId(int processId) {
- this.processId = processId;
- }
-
- public int getWorkerHostWeight() {
- return workerHostWeight;
- }
-
- public void setWorkerHostWeight(int workerHostWeight) {
- this.workerHostWeight = workerHostWeight;
- }
-
- public int getWorkerWaitingTaskCount() {
- return workerWaitingTaskCount;
- }
-
- public void setWorkerWaitingTaskCount(int workerWaitingTaskCount) {
- this.workerWaitingTaskCount = workerWaitingTaskCount;
- }
-
- public int getWorkerExecThreadCount() {
- return workerExecThreadCount;
- }
-
- public void setWorkerExecThreadCount(int workerExecThreadCount) {
- this.workerExecThreadCount = workerExecThreadCount;
- }
-
- public HeartBeat() {
- this.reportTime = System.currentTimeMillis();
- this.serverStatus = Constants.NORMAL_NODE_STATUS;
- }
-
- public HeartBeat(long startupTime, double maxCpuloadAvg, double reservedMemory) {
- this.reportTime = System.currentTimeMillis();
- this.serverStatus = Constants.NORMAL_NODE_STATUS;
- this.startupTime = startupTime;
- this.maxCpuloadAvg = maxCpuloadAvg;
- this.reservedMemory = reservedMemory;
- }
-
- public HeartBeat(long startupTime, double maxCpuloadAvg, double reservedMemory, int hostWeight, int workerExecThreadCount) {
- this.reportTime = System.currentTimeMillis();
- this.serverStatus = Constants.NORMAL_NODE_STATUS;
- this.startupTime = startupTime;
- this.maxCpuloadAvg = maxCpuloadAvg;
- this.reservedMemory = reservedMemory;
- this.workerHostWeight = hostWeight;
- this.workerExecThreadCount = workerExecThreadCount;
- }
-
- /**
- * fill system info
- */
- private void fillSystemInfo() {
- this.cpuUsage = OSUtils.cpuUsage();
- this.loadAverage = OSUtils.loadAverage();
- this.availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
- this.memoryUsage = OSUtils.memoryUsage();
- this.diskAvailable = OSUtils.diskAvailable();
- this.processId = OSUtils.getProcessID();
- }
-
- /**
- * update server state
- */
- public void updateServerState() {
- this.reportTime = System.currentTimeMillis();
- if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) {
- logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
- loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory);
- this.serverStatus = Constants.ABNORMAL_NODE_STATUS;
- } else if (workerWaitingTaskCount > workerExecThreadCount) {
- logger.warn("current waiting task count {} is large than worker thread count {}, worker is busy", workerWaitingTaskCount, workerExecThreadCount);
- this.serverStatus = Constants.BUSY_NODE_STATUE;
- } else {
- this.serverStatus = Constants.NORMAL_NODE_STATUS;
- }
- }
-
- /**
- * encode heartbeat
- */
- public String encodeHeartBeat() {
- this.fillSystemInfo();
- this.updateServerState();
-
- StringBuilder builder = new StringBuilder(100);
- builder.append(cpuUsage).append(Constants.COMMA);
- builder.append(memoryUsage).append(Constants.COMMA);
- builder.append(loadAverage).append(Constants.COMMA);
- builder.append(availablePhysicalMemorySize).append(Constants.COMMA);
- builder.append(maxCpuloadAvg).append(Constants.COMMA);
- builder.append(reservedMemory).append(Constants.COMMA);
- builder.append(startupTime).append(Constants.COMMA);
- builder.append(reportTime).append(Constants.COMMA);
- builder.append(serverStatus).append(Constants.COMMA);
- builder.append(processId).append(Constants.COMMA);
- builder.append(workerHostWeight).append(Constants.COMMA);
- builder.append(workerExecThreadCount).append(Constants.COMMA);
- builder.append(workerWaitingTaskCount).append(Constants.COMMA);
- builder.append(diskAvailable);
-
- return builder.toString();
- }
-
- /**
- * decode heartbeat
- */
- public static HeartBeat decodeHeartBeat(String heartBeatInfo) {
- String[] parts = heartBeatInfo.split(Constants.COMMA);
- if (parts.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH) {
- return null;
- }
- HeartBeat heartBeat = new HeartBeat();
- heartBeat.cpuUsage = Double.parseDouble(parts[0]);
- heartBeat.memoryUsage = Double.parseDouble(parts[1]);
- heartBeat.loadAverage = Double.parseDouble(parts[2]);
- heartBeat.availablePhysicalMemorySize = Double.parseDouble(parts[3]);
- heartBeat.maxCpuloadAvg = Double.parseDouble(parts[4]);
- heartBeat.reservedMemory = Double.parseDouble(parts[5]);
- heartBeat.startupTime = Long.parseLong(parts[6]);
- heartBeat.reportTime = Long.parseLong(parts[7]);
- heartBeat.serverStatus = Integer.parseInt(parts[8]);
- heartBeat.processId = Integer.parseInt(parts[9]);
- heartBeat.workerHostWeight = Integer.parseInt(parts[10]);
- heartBeat.workerExecThreadCount = Integer.parseInt(parts[11]);
- heartBeat.workerWaitingTaskCount = Integer.parseInt(parts[12]);
- heartBeat.diskAvailable = Double.parseDouble(parts[13]);
- return heartBeat;
- }
-}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
index 1a09ae11ef..b94e34e510 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
@@ -17,28 +17,6 @@
package org.apache.dolphinscheduler.common.utils;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
-import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
-import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
-import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
-
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.TimeZone;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -55,6 +33,26 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.fasterxml.jackson.databind.type.CollectionType;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TimeZone;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+import static java.nio.charset.StandardCharsets.UTF_8;
/**
* json utils
@@ -130,7 +128,7 @@ public class JSONUtils {
* @return an object of type T from the string
* classOfT
*/
- public static <T> T parseObject(String json, Class<T> clazz) {
+ public static @Nullable <T> T parseObject(String json, Class<T> clazz) {
if (StringUtils.isEmpty(json)) {
return null;
}
@@ -138,7 +136,7 @@ public class JSONUtils {
try {
return objectMapper.readValue(json, clazz);
} catch (Exception e) {
- logger.error("parse object exception!", e);
+ logger.error("Parse object exception, jsonStr: {}, class: {}", json, clazz, e);
}
return null;
}
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java
deleted file mode 100644
index c207f6bfb5..0000000000
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.common.utils;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.dolphinscheduler.common.Constants;
-
-import org.junit.Test;
-
-/**
- * NetUtilsTest
- */
-public class HeartBeatTest {
-
- @Test
- public void testAbnormalState() {
- long startupTime = System.currentTimeMillis();
- double loadAverage = 100;
- double reservedMemory = 100;
- HeartBeat heartBeat = new HeartBeat(startupTime, loadAverage, reservedMemory);
- heartBeat.updateServerState();
- assertEquals(Constants.ABNORMAL_NODE_STATUS, heartBeat.getServerStatus());
- }
-
- @Test
- public void testBusyState() {
- long startupTime = System.currentTimeMillis();
- double loadAverage = 0;
- double reservedMemory = 0;
- int hostWeight = 1;
- int taskCount = 200;
- int workerThreadCount = 199;
- HeartBeat heartBeat = new HeartBeat(startupTime, loadAverage, reservedMemory, hostWeight, workerThreadCount);
-
- heartBeat.setWorkerWaitingTaskCount(taskCount);
- heartBeat.updateServerState();
- assertEquals(Constants.BUSY_NODE_STATUE, heartBeat.getServerStatus());
- }
-
- @Test
- public void testDecodeHeartBeat() throws Exception {
- String heartBeatInfo = "0.35,0.58,3.09,6.47,5.0,1.0,1634033006749,1634033006857,1,29732,1,199,200,65.86";
- HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
-
- double delta = 0.001;
- assertEquals(0.35, heartBeat.getCpuUsage(), delta);
- assertEquals(0.58, heartBeat.getMemoryUsage(), delta);
- assertEquals(3.09, heartBeat.getLoadAverage(), delta);
- assertEquals(6.47, heartBeat.getAvailablePhysicalMemorySize(), delta);
- assertEquals(5.0, heartBeat.getMaxCpuloadAvg(), delta);
- assertEquals(1.0, heartBeat.getReservedMemory(), delta);
- assertEquals(1634033006749L, heartBeat.getStartupTime());
- assertEquals(1634033006857L, heartBeat.getReportTime());
- assertEquals(1, heartBeat.getServerStatus());
- assertEquals(29732, heartBeat.getProcessId());
- assertEquals(199, heartBeat.getWorkerExecThreadCount());
- assertEquals(200, heartBeat.getWorkerWaitingTaskCount());
- assertEquals(65.86, heartBeat.getDiskAvailable(), delta);
- }
-
-}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 26e2cbe02a..b0426f477a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -95,7 +95,7 @@ public class MasterConfig implements Validator {
private String masterAddress;
// /nodes/master/ip:listenPort
- private String masterRegistryNodePath;
+ private String masterRegistryPath;
@Override
public boolean supports(Class<?> clazz) {
@@ -139,8 +139,7 @@ public class MasterConfig implements Validator {
masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
- masterConfig
- .setMasterRegistryNodePath(REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterConfig.getMasterAddress());
+ masterConfig.setMasterRegistryPath(REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterConfig.getMasterAddress());
printConfig();
}
@@ -161,6 +160,6 @@ public class MasterConfig implements Validator {
logger.info("Master config: killYarnJobWhenTaskFailover -> {} ", killYarnJobWhenTaskFailover);
logger.info("Master config: registryDisconnectStrategy -> {} ", registryDisconnectStrategy);
logger.info("Master config: masterAddress -> {} ", masterAddress);
- logger.info("Master config: masterRegistryNodePath -> {} ", masterRegistryNodePath);
+ logger.info("Master config: masterRegistryPath -> {} ", masterRegistryPath);
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
index 4051068c53..bbf84a3d74 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
@@ -17,8 +17,8 @@
package org.apache.dolphinscheduler.server.master.dispatch.host;
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
@@ -27,14 +27,13 @@ import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
-import org.springframework.beans.factory.annotation.Autowired;
-
/**
* common host manager
*/
@@ -80,23 +79,10 @@ public abstract class CommonHostManager implements HostManager {
Set<String> nodes = serverNodeManager.getWorkerGroupNodes(workerGroup);
if (CollectionUtils.isNotEmpty(nodes)) {
for (String node : nodes) {
- String heartbeat = serverNodeManager.getWorkerNodeInfo(node);
- int hostWeight = getWorkerHostWeightFromHeartbeat(heartbeat);
- hostWorkers.add(HostWorker.of(node, hostWeight, workerGroup));
+ WorkerHeartBeat workerNodeInfo = serverNodeManager.getWorkerNodeInfo(node);
+ hostWorkers.add(HostWorker.of(node, workerNodeInfo.getWorkerHostWeight(), workerGroup));
}
}
return hostWorkers;
}
-
- protected int getWorkerHostWeightFromHeartbeat(String heartBeatInfo) {
- int hostWeight = Constants.DEFAULT_WORKER_HOST_WEIGHT;
- if (!StringUtils.isEmpty(heartBeatInfo)) {
- HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
- if (heartBeat != null) {
- hostWeight = heartBeat.getWorkerHostWeight();
- }
- }
- return hostWeight;
- }
-
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 5002484144..3ad46ddd53 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -17,18 +17,19 @@
package org.apache.dolphinscheduler.server.master.dispatch.host;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
import org.apache.dolphinscheduler.server.master.registry.WorkerInfoChangeListener;
-import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -39,11 +40,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import javax.annotation.PostConstruct;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* lower weight host manager
*/
@@ -97,7 +93,7 @@ public class LowerWeightHostManager extends CommonHostManager {
private class WorkerWeightListener implements WorkerInfoChangeListener {
@Override
- public void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo) {
+ public void notify(Map<String, Set<String>> workerGroups, Map<String, WorkerHeartBeat> workerNodeInfo) {
syncWorkerResources(workerGroups, workerNodeInfo);
}
}
@@ -109,7 +105,7 @@ public class LowerWeightHostManager extends CommonHostManager {
* @param workerNodeInfoMap worker node info map, key is worker node, value is worker info.
*/
private void syncWorkerResources(final Map<String, Set<String>> workerGroupNodes,
- final Map<String, String> workerNodeInfoMap) {
+ final Map<String, WorkerHeartBeat> workerNodeInfoMap) {
try {
Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
for (Map.Entry<String, Set<String>> entry : workerGroupNodes.entrySet()) {
@@ -117,7 +113,7 @@ public class LowerWeightHostManager extends CommonHostManager {
Set<String> nodes = entry.getValue();
Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
for (String node : nodes) {
- String heartbeat = workerNodeInfoMap.getOrDefault(node, null);
+ WorkerHeartBeat heartbeat = workerNodeInfoMap.getOrDefault(node, null);
Optional<HostWeight> hostWeightOpt = getHostWeight(node, workerGroup, heartbeat);
hostWeightOpt.ifPresent(hostWeights::add);
}
@@ -131,13 +127,9 @@ public class LowerWeightHostManager extends CommonHostManager {
}
}
- private Optional<HostWeight> getHostWeight(String addr, String workerGroup, String heartBeatInfo) {
- if (StringUtils.isEmpty(heartBeatInfo)) {
- logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup);
- return Optional.empty();
- }
- HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
+ public Optional<HostWeight> getHostWeight(String addr, String workerGroup, WorkerHeartBeat heartBeat) {
if (heartBeat == null) {
+ logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup);
return Optional.empty();
}
if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) {
@@ -151,12 +143,15 @@ public class LowerWeightHostManager extends CommonHostManager {
return Optional.empty();
}
return Optional.of(
- new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup),
- heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(),
- heartBeat.getWorkerWaitingTaskCount(), heartBeat.getStartupTime()));
+ new HostWeight(
+ HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup),
+ heartBeat.getCpuUsage(),
+ heartBeat.getMemoryUsage(),
+ heartBeat.getLoadAverage(),
+ heartBeat.getWorkerWaitingTaskCount(),
+ heartBeat.getStartupTime()));
}
-
private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights) {
lock.lock();
try {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
deleted file mode 100644
index 5ca7c87f1b..0000000000
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.registry;
-
-import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
-import org.apache.dolphinscheduler.common.utils.HeartBeat;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
-
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Master heart beat task
- */
-public class MasterHeartBeatTask implements Runnable {
-
- private final Logger logger = LoggerFactory.getLogger(MasterHeartBeatTask.class);
-
- private final Set<String> heartBeatPaths;
- private final RegistryClient registryClient;
- private final HeartBeat heartBeat;
- private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
-
- public MasterHeartBeatTask(long startupTime,
- double maxCpuloadAvg,
- double reservedMemory,
- Set<String> heartBeatPaths,
- RegistryClient registryClient) {
- this.heartBeatPaths = heartBeatPaths;
- this.registryClient = registryClient;
- this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory);
- }
-
- public String getHeartBeatInfo() {
- return this.heartBeat.encodeHeartBeat();
- }
-
- @Override
- public void run() {
- try {
- if (!ServerLifeCycleManager.isRunning()) {
- return;
- }
- for (String heartBeatPath : heartBeatPaths) {
- registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
- }
- heartBeatErrorTimes.set(0);
- } catch (Throwable ex) {
- logger.error("HeartBeat task execute failed, errorTimes: {}", heartBeatErrorTimes.get(), ex);
- }
- }
-}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 99a65e9fce..ac8d60e20e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -17,36 +17,28 @@
package org.apache.dolphinscheduler.server.master.registry;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
-import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
-
+import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.RegistryException;
-import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.service.FailoverService;
+import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.time.Duration;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.google.common.collect.Sets;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
+import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
/**
* <p>DolphinScheduler master register client, used to connect to registry and hand the registry events.
- * <p>When the Master node startup, it will register in registry center. And schedule a {@link MasterHeartBeatTask} to update its metadata in registry.
+ * <p>When the Master node startup, it will register in registry center. And start a {@link MasterHeartBeatTask} to update its metadata in registry.
*/
@Component
public class MasterRegistryClient implements AutoCloseable {
@@ -65,18 +57,11 @@ public class MasterRegistryClient implements AutoCloseable {
@Autowired
private MasterConnectStrategy masterConnectStrategy;
- private ScheduledExecutorService heartBeatExecutor;
-
- /**
- * master startup time, ms
- */
- private long startupTime;
+ private MasterHeartBeatTask masterHeartBeatTask;
public void start() {
try {
- this.startupTime = System.currentTimeMillis();
- this.heartBeatExecutor =
- Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
+ this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, registryClient);
// master registry
registry();
registryClient.addConnectionStateListener(
@@ -166,17 +151,11 @@ public class MasterRegistryClient implements AutoCloseable {
*/
void registry() {
logger.info("Master node : {} registering to registry center", masterConfig.getMasterAddress());
- String localNodePath = masterConfig.getMasterRegistryNodePath();
- Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
- MasterHeartBeatTask heartBeatTask = new MasterHeartBeatTask(startupTime,
- masterConfig.getMaxCpuLoadAvg(),
- masterConfig.getReservedMemory(),
- Sets.newHashSet(localNodePath),
- registryClient);
+ String masterRegistryPath = masterConfig.getMasterRegistryPath();
// remove before persist
- registryClient.remove(localNodePath);
- registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
+ registryClient.remove(masterRegistryPath);
+ registryClient.persistEphemeral(masterRegistryPath, JSONUtils.toJsonString(masterHeartBeatTask.getHeartBeat()));
while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
logger.warn("The current master server node:{} cannot find in registry", NetUtils.getHost());
@@ -186,19 +165,18 @@ public class MasterRegistryClient implements AutoCloseable {
// sleep 1s, waiting master failover remove
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
- this.heartBeatExecutor.scheduleWithFixedDelay(heartBeatTask, 0L, masterHeartbeatInterval.getSeconds(),
- TimeUnit.SECONDS);
- logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s",
- masterConfig.getMasterAddress(), masterHeartbeatInterval);
+ masterHeartBeatTask.start();
+ logger.info("Master node : {} registered to registry center successfully", masterConfig.getMasterAddress());
}
public void deregister() {
try {
- registryClient.remove(masterConfig.getMasterRegistryNodePath());
+ registryClient.remove(masterConfig.getMasterRegistryPath());
logger.info("Master node : {} unRegistry to register center.", masterConfig.getMasterAddress());
- heartBeatExecutor.shutdown();
- logger.info("MasterServer heartbeat executor shutdown");
+ if (masterHeartBeatTask != null) {
+ masterHeartBeatTask.shutdown();
+ }
registryClient.close();
} catch (Exception e) {
logger.error("MasterServer remove registry path exception ", e);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 58df904fc5..af96da3f2f 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -17,13 +17,13 @@
package org.apache.dolphinscheduler.server.master.registry;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
-
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
@@ -34,10 +34,13 @@ import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
+import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -52,14 +55,10 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.annotation.PreDestroy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
/**
* server node manager
@@ -69,23 +68,19 @@ public class ServerNodeManager implements InitializingBean {
private final Logger logger = LoggerFactory.getLogger(ServerNodeManager.class);
- /**
- * master lock
- */
private final Lock masterLock = new ReentrantLock();
- /**
- * worker group lock
- */
- private final Lock workerGroupLock = new ReentrantLock();
+ private final ReentrantReadWriteLock workerGroupLock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = workerGroupLock.readLock();
+ private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = workerGroupLock.writeLock();
- /**
- * worker node info lock
- */
- private final Lock workerNodeInfoLock = new ReentrantLock();
+
+ private final ReentrantReadWriteLock workerNodeInfoLock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock workerNodeInfoReadLock = workerNodeInfoLock.readLock();
+ private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = workerNodeInfoLock.writeLock();
/**
- * worker group nodes
+ * worker group nodes, workerGroup -> ips
*/
private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>();
@@ -94,10 +89,7 @@ public class ServerNodeManager implements InitializingBean {
*/
private final Set<String> masterNodes = new HashSet<>();
- /**
- * worker node info
- */
- private final Map<String, String> workerNodeInfo = new HashMap<>();
+ private final Map<String, WorkerHeartBeat> workerNodeInfo = new HashMap<>();
/**
* executor service
@@ -108,7 +100,7 @@ public class ServerNodeManager implements InitializingBean {
private RegistryClient registryClient;
/**
- * eg : /node/worker/group/127.0.0.1:xxx
+ * eg : /dolphinscheduler/node/worker/group/127.0.0.1:xxx
*/
private static final int WORKER_LISTENER_CHECK_LENGTH = 5;
@@ -244,26 +236,29 @@ public class ServerNodeManager implements InitializingBean {
final String data = event.data();
if (registryClient.isWorkerPath(path)) {
try {
+ String[] parts = path.split("/");
+ if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
+ throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
+ }
+ final String workerGroupName = parts[parts.length - 2];
+ final String workerAddress = parts[parts.length - 1];
+
if (type == Type.ADD) {
logger.info("worker group node : {} added.", path);
- String group = parseGroup(path);
- Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
+ Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
logger.info("currentNodes : {}", currentNodes);
- syncWorkerGroupNodes(group, currentNodes);
+ syncWorkerGroupNodes(workerGroupName, currentNodes);
} else if (type == Type.REMOVE) {
logger.info("worker group node : {} down.", path);
- String group = parseGroup(path);
- Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
- syncWorkerGroupNodes(group, currentNodes);
+ Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
+ syncWorkerGroupNodes(workerGroupName, currentNodes);
alertDao.sendServerStoppedAlert(1, path, "WORKER");
} else if (type == Type.UPDATE) {
logger.debug("worker group node : {} update, data: {}", path, data);
- String group = parseGroup(path);
- Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
- syncWorkerGroupNodes(group, currentNodes);
+ Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
+ syncWorkerGroupNodes(workerGroupName, currentNodes);
- String node = parseNode(path);
- syncSingleWorkerNodeInfo(node, data);
+ syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class));
}
notifyWorkerInfoChangeListeners();
} catch (IllegalArgumentException ex) {
@@ -274,22 +269,6 @@ public class ServerNodeManager implements InitializingBean {
}
}
-
- private String parseGroup(String path) {
- String[] parts = path.split("/");
- if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
- throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
- }
- return parts[parts.length - 2];
- }
-
- private String parseNode(String path) {
- String[] parts = path.split("/");
- if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
- throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
- }
- return parts[parts.length - 1];
- }
}
class MasterDataListener implements SubscribeListener {
@@ -333,20 +312,6 @@ public class ServerNodeManager implements InitializingBean {
}
- /**
- * get master nodes
- *
- * @return master nodes
- */
- public Set<String> getMasterNodes() {
- masterLock.lock();
- try {
- return Collections.unmodifiableSet(masterNodes);
- } finally {
- masterLock.unlock();
- }
- }
-
/**
* sync master nodes
*
@@ -355,18 +320,17 @@ public class ServerNodeManager implements InitializingBean {
private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {
masterLock.lock();
try {
- String addr = NetUtils.getAddr(NetUtils.getHost(), masterConfig.getListenPort());
this.masterNodes.addAll(nodes);
this.masterPriorityQueue.clear();
this.masterPriorityQueue.putList(masterNodes);
- int index = masterPriorityQueue.getIndex(addr);
+ int index = masterPriorityQueue.getIndex(masterConfig.getMasterAddress());
if (index >= 0) {
MASTER_SIZE = nodes.size();
MASTER_SLOT = index;
} else {
- logger.warn("current addr:{} is not in active master list", addr);
+ logger.warn("current addr:{} is not in active master list", masterConfig.getMasterAddress());
}
- logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, addr);
+ logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, masterConfig.getMasterAddress());
} finally {
masterLock.unlock();
}
@@ -379,19 +343,24 @@ public class ServerNodeManager implements InitializingBean {
* @param nodes worker nodes
*/
private void syncWorkerGroupNodes(String workerGroup, Collection<String> nodes) {
- workerGroupLock.lock();
+ workerGroupWriteLock.lock();
try {
Set<String> workerNodes = workerGroupNodes.getOrDefault(workerGroup, new HashSet<>());
workerNodes.clear();
workerNodes.addAll(nodes);
workerGroupNodes.put(workerGroup, workerNodes);
} finally {
- workerGroupLock.unlock();
+ workerGroupWriteLock.unlock();
}
}
public Map<String, Set<String>> getWorkerGroupNodes() {
- return Collections.unmodifiableMap(workerGroupNodes);
+ workerGroupReadLock.lock();
+ try {
+ return Collections.unmodifiableMap(workerGroupNodes);
+ } finally {
+ workerGroupReadLock.unlock();
+ }
}
/**
@@ -401,7 +370,7 @@ public class ServerNodeManager implements InitializingBean {
* @return worker nodes
*/
public Set<String> getWorkerGroupNodes(String workerGroup) {
- workerGroupLock.lock();
+ workerGroupReadLock.lock();
try {
if (StringUtils.isEmpty(workerGroup)) {
workerGroup = Constants.DEFAULT_WORKER_GROUP;
@@ -412,16 +381,11 @@ public class ServerNodeManager implements InitializingBean {
}
return nodes;
} finally {
- workerGroupLock.unlock();
+ workerGroupReadLock.unlock();
}
}
- /**
- * get worker node info
- *
- * @return worker node info
- */
- public Map<String, String> getWorkerNodeInfo() {
+ public Map<String, WorkerHeartBeat> getWorkerNodeInfo() {
return Collections.unmodifiableMap(workerNodeInfo);
}
@@ -431,12 +395,12 @@ public class ServerNodeManager implements InitializingBean {
* @param workerNode worker node
* @return worker node info
*/
- public String getWorkerNodeInfo(String workerNode) {
- workerNodeInfoLock.lock();
+ public WorkerHeartBeat getWorkerNodeInfo(String workerNode) {
+ workerNodeInfoReadLock.lock();
try {
return workerNodeInfo.getOrDefault(workerNode, null);
} finally {
- workerNodeInfoLock.unlock();
+ workerNodeInfoReadLock.unlock();
}
}
@@ -446,24 +410,26 @@ public class ServerNodeManager implements InitializingBean {
* @param newWorkerNodeInfo new worker node info
*/
private void syncAllWorkerNodeInfo(Map<String, String> newWorkerNodeInfo) {
- workerNodeInfoLock.lock();
+ workerNodeInfoWriteLock.lock();
try {
workerNodeInfo.clear();
- workerNodeInfo.putAll(newWorkerNodeInfo);
+ for (Map.Entry<String, String> entry : newWorkerNodeInfo.entrySet()) {
+ workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
+ }
} finally {
- workerNodeInfoLock.unlock();
+ workerNodeInfoWriteLock.unlock();
}
}
/**
* sync single worker node info
*/
- private void syncSingleWorkerNodeInfo(String node, String info) {
- workerNodeInfoLock.lock();
+ private void syncSingleWorkerNodeInfo(String node, WorkerHeartBeat info) {
+ workerNodeInfoWriteLock.lock();
try {
workerNodeInfo.put(node, info);
} finally {
- workerNodeInfoLock.unlock();
+ workerNodeInfoWriteLock.unlock();
}
}
@@ -478,7 +444,7 @@ public class ServerNodeManager implements InitializingBean {
private void notifyWorkerInfoChangeListeners() {
Map<String, Set<String>> workerGroupNodes = getWorkerGroupNodes();
- Map<String, String> workerNodeInfo = getWorkerNodeInfo();
+ Map<String, WorkerHeartBeat> workerNodeInfo = getWorkerNodeInfo();
for (WorkerInfoChangeListener listener : workerInfoChangeListeners) {
listener.notify(workerGroupNodes, workerNodeInfo);
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
index f885a6fba0..9efc3517b8 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master.registry;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
+
import java.util.Map;
import java.util.Set;
@@ -31,6 +33,6 @@ public interface WorkerInfoChangeListener {
* @param workerGroups worker groups map, key is worker group name, value is worker address.
* @param workerNodeInfo worker node info map, key is worker address, value is worker info.
*/
- void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo);
+ void notify(Map<String, Set<String>> workerGroups, Map<String, WorkerHeartBeat> workerNodeInfo);
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
new file mode 100644
index 0000000000..53b90b7370
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.task;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
+import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+@Slf4j
+public class MasterHeartBeatTask extends BaseHeartBeatTask<MasterHeartBeat> {
+
+ private final MasterConfig masterConfig;
+
+ private final RegistryClient registryClient;
+
+ private final String heartBeatPath;
+
+ private final int processId;
+
+ public MasterHeartBeatTask(@NonNull MasterConfig masterConfig,
+ @NonNull RegistryClient registryClient) {
+ super("MasterHeartBeatTask", masterConfig.getHeartbeatInterval().toMillis());
+ this.masterConfig = masterConfig;
+ this.registryClient = registryClient;
+ this.heartBeatPath = masterConfig.getMasterRegistryPath();
+ this.processId = OSUtils.getProcessID();
+ }
+
+ @Override
+ public MasterHeartBeat getHeartBeat() {
+ return MasterHeartBeat.builder()
+ .startupTime(ServerLifeCycleManager.getServerStartupTime())
+ .reportTime(System.currentTimeMillis())
+ .cpuUsage(OSUtils.cpuUsage())
+ .loadAverage(OSUtils.loadAverage())
+ .availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize())
+ .maxCpuloadAvg(masterConfig.getMaxCpuLoadAvg())
+ .reservedMemory(masterConfig.getReservedMemory())
+ .processId(processId)
+ .build();
+ }
+
+ @Override
+ public void writeHeartBeat(MasterHeartBeat masterHeartBeat) {
+ String masterHeartBeatJson = JSONUtils.toJsonString(masterHeartBeat);
+ registryClient.persistEphemeral(heartBeatPath, masterHeartBeatJson);
+ log.info("Success write master heartBeatInfo into registry, masterRegistryPath: {}, heartBeatInfo: {}",
+ heartBeatPath, masterHeartBeatJson);
+ }
+}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
index c8121d494b..c0c554e43b 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.host;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutionContextTestUtils;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@@ -55,6 +56,7 @@ public class RoundRobinHostManagerTest {
@Test
public void testSelectWithResult() {
Mockito.when(serverNodeManager.getWorkerGroupNodes("default")).thenReturn(Sets.newHashSet("192.168.1.1:22"));
+ Mockito.when(serverNodeManager.getWorkerNodeInfo("192.168.1.1:22")).thenReturn(new WorkerHeartBeat());
ExecutionContext context = ExecutionContextTestUtils.getExecutionContext(10000);
Host host = roundRobinHostManager.select(context);
Assert.assertTrue(!Strings.isNullOrEmpty(host.getAddress()));
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
index cc17d22df7..b71e267216 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
@@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
@@ -69,6 +70,12 @@ public class MasterRegistryClientTest {
@Mock
private ProcessService processService;
+ @Mock
+ private MasterConnectStrategy masterConnectStrategy;
+
+ @Mock
+ private MasterHeartBeatTask masterHeartBeatTask;
+
@Mock
private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager;
@@ -81,6 +88,7 @@ public class MasterRegistryClientTest {
});
ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient);
+ ReflectionTestUtils.setField(masterRegistryClient, "masterHeartBeatTask", masterHeartBeatTask);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
index a6c6218fee..508ba293aa 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
@@ -19,11 +19,13 @@ package org.apache.dolphinscheduler.service.registry;
import com.google.common.base.Strings;
import lombok.NonNull;
+import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.Registry;
@@ -94,21 +96,33 @@ public class RegistryClient {
List<Server> serverList = new ArrayList<>();
for (Map.Entry<String, String> entry : serverMaps.entrySet()) {
- HeartBeat heartBeat = HeartBeat.decodeHeartBeat(entry.getValue());
- if (heartBeat == null) {
+ String serverPath = entry.getKey();
+ String heartBeatJson = entry.getValue();
+ if (StringUtils.isEmpty(heartBeatJson)) {
+ logger.error("The heartBeatJson is empty, serverPath: {}", serverPath);
continue;
}
-
Server server = new Server();
- server.setResInfo(JSONUtils.toJsonString(heartBeat));
- server.setCreateTime(new Date(heartBeat.getStartupTime()));
- server.setLastHeartbeatTime(new Date(heartBeat.getReportTime()));
- server.setId(heartBeat.getProcessId());
+ switch (nodeType) {
+ case MASTER:
+ MasterHeartBeat masterHeartBeat = JSONUtils.parseObject(heartBeatJson, MasterHeartBeat.class);
+ server.setCreateTime(new Date(masterHeartBeat.getStartupTime()));
+ server.setLastHeartbeatTime(new Date(masterHeartBeat.getReportTime()));
+ server.setId(masterHeartBeat.getProcessId());
+ break;
+ case WORKER:
+ WorkerHeartBeat workerHeartBeat = JSONUtils.parseObject(heartBeatJson, WorkerHeartBeat.class);
+ server.setCreateTime(new Date(workerHeartBeat.getStartupTime()));
+ server.setLastHeartbeatTime(new Date(workerHeartBeat.getReportTime()));
+ server.setId(workerHeartBeat.getProcessId());
+ break;
+ }
- String key = entry.getKey();
- server.setZkDirectory(parentPath + "/" + key);
+ server.setResInfo(heartBeatJson);
+ // todo: add host, port in heartBeat Info, so that we don't need to parse this again
+ server.setZkDirectory(parentPath + "/" + serverPath);
// set host and port
- String[] hostAndPort = key.split(COLON);
+ String[] hostAndPort = serverPath.split(COLON);
String[] hosts = hostAndPort[0].split(DIVISION_STRING);
// fetch the last one
server.setHost(hosts[hosts.length - 1]);
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index f29f93eed3..555b5bc770 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
@@ -95,6 +96,9 @@ public class WorkerServer implements IStoppable {
@Autowired
private MessageRetryRunner messageRetryRunner;
+ @Autowired
+ private WorkerConfig workerConfig;
+
/**
* worker server startup, not use web service
*
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index f50e6f2d03..a7e6927415 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.config;
import com.google.common.collect.Sets;
import lombok.Data;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
import org.slf4j.Logger;
@@ -31,6 +32,9 @@ import org.springframework.validation.annotation.Validated;
import java.time.Duration;
import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
@Data
@Validated
@@ -57,6 +61,7 @@ public class WorkerConfig implements Validator {
* This field doesn't need to set at config file, it will be calculated by workerIp:listenPort
*/
private String workerAddress;
+ private Set<String> workerGroupRegistryPaths;
@Override
public boolean supports(Class<?> clazz) {
@@ -76,6 +81,18 @@ public class WorkerConfig implements Validator {
workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort()));
+
+ workerConfig.setGroups(workerConfig.getGroups().stream().map(String::trim).collect(Collectors.toSet()));
+ if (CollectionUtils.isEmpty(workerConfig.getGroups())) {
+ errors.rejectValue("groups", null, "should not be empty");
+ }
+
+ Set<String> workerRegistryPaths = workerConfig.getGroups()
+ .stream()
+ .map(workerGroup -> REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup + "/" + workerConfig.getWorkerAddress())
+ .collect(Collectors.toSet());
+
+ workerConfig.setWorkerGroupRegistryPaths(workerRegistryPaths);
printConfig();
}
@@ -93,5 +110,6 @@ public class WorkerConfig implements Validator {
logger.info("Worker config: alertListenPort -> {}", alertListenPort);
logger.info("Worker config: registryDisconnectStrategy -> {}", registryDisconnectStrategy);
logger.info("Worker config: workerAddress -> {}", registryDisconnectStrategy);
+ logger.info("Worker config: workerGroupRegistryPaths: {}", workerGroupRegistryPaths);
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java
deleted file mode 100644
index 84506753be..0000000000
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.registry;
-
-import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
-import org.apache.dolphinscheduler.common.utils.HeartBeat;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Heart beat task
- */
-public class WorkerHeartBeatTask implements Runnable {
-
- private final Logger logger = LoggerFactory.getLogger(WorkerHeartBeatTask.class);
-
- private final Set<String> heartBeatPaths;
- private final RegistryClient registryClient;
- private int workerWaitingTaskCount;
- private final HeartBeat heartBeat;
-
- private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
-
- public WorkerHeartBeatTask(long startupTime,
- double maxCpuloadAvg,
- double reservedMemory,
- int hostWeight,
- Set<String> heartBeatPaths,
- RegistryClient registryClient,
- int workerThreadCount,
- int workerWaitingTaskCount) {
- this.heartBeatPaths = heartBeatPaths;
- this.registryClient = registryClient;
- this.workerWaitingTaskCount = workerWaitingTaskCount;
- this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory, hostWeight, workerThreadCount);
- }
-
- public String getHeartBeatInfo() {
- return this.heartBeat.encodeHeartBeat();
- }
-
- @Override
- public void run() {
- try {
- if (!ServerLifeCycleManager.isRunning()) {
- return;
- }
- heartBeat.setStartupTime(ServerLifeCycleManager.getServerStartupTime());
- // update waiting task count
- heartBeat.setWorkerWaitingTaskCount(workerWaitingTaskCount);
-
- for (String heartBeatPath : heartBeatPaths) {
- registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
- }
- heartBeatErrorTimes.set(0);
- } catch (Throwable ex) {
- logger.error("HeartBeat task execute failed, errorTimes: {}", heartBeatErrorTimes.get(), ex);
- }
- }
-}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index 5c3f7bf507..d2147d1559 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -17,80 +17,52 @@
package org.apache.dolphinscheduler.server.worker.registry;
-import com.google.common.base.Strings;
-import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.RegistryException;
-import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import org.apache.dolphinscheduler.server.worker.task.WorkerHeartBeatTask;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.IOException;
-import java.util.Set;
-import java.util.StringJoiner;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
-import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
-/**
- * worker registry
- */
+@Slf4j
@Service
public class WorkerRegistryClient implements AutoCloseable {
- private final Logger logger = LoggerFactory.getLogger(WorkerRegistryClient.class);
-
- /**
- * worker config
- */
@Autowired
private WorkerConfig workerConfig;
- /**
- * worker manager
- */
@Autowired
private WorkerManagerThread workerManagerThread;
- /**
- * heartbeat executor
- */
- private ScheduledExecutorService heartBeatExecutor;
-
@Autowired
private RegistryClient registryClient;
@Autowired
private WorkerConnectStrategy workerConnectStrategy;
- /**
- * worker startup time, ms
- */
- private long startupTime;
+ private WorkerHeartBeatTask workerHeartBeatTask;
- private Set<String> workerGroups;
@PostConstruct
public void initWorkRegistry() {
- this.workerGroups = workerConfig.getGroups();
- this.startupTime = System.currentTimeMillis();
- this.heartBeatExecutor =
- Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
+ this.workerHeartBeatTask = new WorkerHeartBeatTask(
+ workerConfig,
+ registryClient,
+ () -> workerManagerThread.getWaitSubmitQueueSize());
}
public void start() {
@@ -107,24 +79,13 @@ public class WorkerRegistryClient implements AutoCloseable {
* registry
*/
private void registry() {
- String address = NetUtils.getAddr(workerConfig.getListenPort());
- Set<String> workerZkPaths = getWorkerZkPaths();
- long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds();
-
- WorkerHeartBeatTask heartBeatTask = new WorkerHeartBeatTask(startupTime,
- workerConfig.getMaxCpuLoadAvg(),
- workerConfig.getReservedMemory(),
- workerConfig.getHostWeight(),
- workerZkPaths,
- registryClient,
- workerConfig.getExecThreads(),
- workerManagerThread.getThreadPoolQueueSize());
-
- for (String workerZKPath : workerZkPaths) {
+ WorkerHeartBeat workerHeartBeat = workerHeartBeatTask.getHeartBeat();
+
+ for (String workerZKPath : workerConfig.getWorkerGroupRegistryPaths()) {
// remove before persist
registryClient.remove(workerZKPath);
- registryClient.persistEphemeral(workerZKPath, heartBeatTask.getHeartBeatInfo());
- logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath);
+ registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString(workerHeartBeat));
+ log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(), workerZKPath);
}
while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.WORKER)) {
@@ -134,37 +95,9 @@ public class WorkerRegistryClient implements AutoCloseable {
// sleep 1s, waiting master failover remove
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
- this.heartBeatExecutor.scheduleWithFixedDelay(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval,
- TimeUnit.SECONDS);
- logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval);
- }
-
- /**
- * get worker path
- */
- public Set<String> getWorkerZkPaths() {
- Set<String> workerPaths = Sets.newHashSet();
- String address = getLocalAddress();
-
- for (String workGroup : this.workerGroups) {
- StringJoiner workerPathJoiner = new StringJoiner(SINGLE_SLASH);
- workerPathJoiner.add(REGISTRY_DOLPHINSCHEDULER_WORKERS);
- if (Strings.isNullOrEmpty(workGroup)) {
- workGroup = DEFAULT_WORKER_GROUP;
- }
- // trim and lower case is need
- workerPathJoiner.add(workGroup.trim().toLowerCase());
- workerPathJoiner.add(address);
- workerPaths.add(workerPathJoiner.toString());
- }
- return workerPaths;
- }
- /**
- * get local address
- */
- private String getLocalAddress() {
- return NetUtils.getAddr(workerConfig.getListenPort());
+ workerHeartBeatTask.start();
+ log.info("Worker node: {} registry finished", workerConfig.getWorkerAddress());
}
public void setRegistryStoppable(IStoppable stoppable) {
@@ -173,12 +106,11 @@ public class WorkerRegistryClient implements AutoCloseable {
@Override
public void close() throws IOException {
- if (heartBeatExecutor != null) {
- heartBeatExecutor.shutdownNow();
- logger.info("Heartbeat executor shutdown");
+ if (workerHeartBeatTask != null) {
+ workerHeartBeatTask.shutdown();
}
registryClient.close();
- logger.info("registry client closed");
+ log.info("Worker registry client closed");
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
new file mode 100644
index 0000000000..672135613a
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.task;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import java.util.function.Supplier;
+
+@Slf4j
+public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
+
+ private final WorkerConfig workerConfig;
+ private final RegistryClient registryClient;
+
+ private final Supplier<Integer> workerWaitingTaskCount;
+
+ private final int processId;
+
+ public WorkerHeartBeatTask(@NonNull WorkerConfig workerConfig,
+ @NonNull RegistryClient registryClient,
+ @NonNull Supplier<Integer> workerWaitingTaskCount) {
+ super("WorkerHeartBeatTask", workerConfig.getHeartbeatInterval().toMillis());
+ this.workerConfig = workerConfig;
+ this.registryClient = registryClient;
+ this.workerWaitingTaskCount = workerWaitingTaskCount;
+ this.processId = OSUtils.getProcessID();
+ }
+
+ @Override
+ public WorkerHeartBeat getHeartBeat() {
+ double loadAverage = OSUtils.loadAverage();
+ double cpuUsage = OSUtils.cpuUsage();
+ int maxCpuLoadAvg = workerConfig.getMaxCpuLoadAvg();
+ double reservedMemory = workerConfig.getReservedMemory();
+ double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
+ int execThreads = workerConfig.getExecThreads();
+ int workerWaitingTaskCount = this.workerWaitingTaskCount.get();
+ int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, availablePhysicalMemorySize, reservedMemory, execThreads, workerWaitingTaskCount);
+
+ return WorkerHeartBeat.builder()
+ .startupTime(ServerLifeCycleManager.getServerStartupTime())
+ .reportTime(System.currentTimeMillis())
+ .cpuUsage(cpuUsage)
+ .loadAverage(loadAverage)
+ .availablePhysicalMemorySize(availablePhysicalMemorySize)
+ .maxCpuloadAvg(maxCpuLoadAvg)
+ .reservedMemory(reservedMemory)
+ .processId(processId)
+ .workerHostWeight(workerConfig.getHostWeight())
+ .workerWaitingTaskCount(this.workerWaitingTaskCount.get())
+ .workerExecThreadCount(workerConfig.getExecThreads())
+ .serverStatus(serverStatus)
+ .build();
+ }
+
+ @Override
+ public void writeHeartBeat(WorkerHeartBeat workerHeartBeat) {
+ String workerHeartBeatJson = JSONUtils.toJsonString(workerHeartBeat);
+ for (String workerGroupRegistryPath : workerConfig.getWorkerGroupRegistryPaths()) {
+ registryClient.persistEphemeral(workerGroupRegistryPath, workerHeartBeatJson);
+ }
+ log.info("Success write worker group heartBeatInfo into registry, workGroupPath: {} workerHeartBeatInfo: {}",
+ workerConfig.getWorkerGroupRegistryPaths(), workerHeartBeatJson);
+ }
+
+ public int getServerStatus(double loadAverage,
+ double maxCpuloadAvg,
+ double availablePhysicalMemorySize,
+ double reservedMemory,
+ int workerExecThreadCount,
+ int workerWaitingTaskCount) {
+ if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) {
+ log.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
+ loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory);
+ return Constants.ABNORMAL_NODE_STATUS;
+ } else if (workerWaitingTaskCount > workerExecThreadCount) {
+ log.warn("current waiting task count {} is large than worker thread count {}, worker is busy", workerWaitingTaskCount, workerExecThreadCount);
+ return Constants.BUSY_NODE_STATUE;
+ } else {
+ return Constants.NORMAL_NODE_STATUS;
+ }
+ }
+}