You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ji...@apache.org on 2022/08/23 03:30:21 UTC
[dolphinscheduler] branch dev updated: [Fix-9980] [Server] fix heartBeatTaskCount bug (#11232)
This is an automated email from the ASF dual-hosted git repository.
jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 3f2ca7bca3 [Fix-9980] [Server] fix heartBeatTaskCount bug (#11232)
3f2ca7bca3 is described below
commit 3f2ca7bca3e612b2ce5d22324d22aa33fac04ea2
Author: JinYong Li <42...@users.noreply.github.com>
AuthorDate: Tue Aug 23 11:30:13 2022 +0800
[Fix-9980] [Server] fix heartBeatTaskCount bug (#11232)
* fix heartBeat bug
* modify class name
* fix conflict
Co-authored-by: JinyLeeChina <ji...@foxmail.com>
---
.../master/registry/MasterHeartBeatTask.java | 43 ++++++----------------
.../master/registry/MasterRegistryClient.java | 32 ++++++++--------
.../worker/registry/WorkerHeartBeatTask.java | 18 ++-------
.../worker/registry/WorkerRegistryClient.java | 17 ++++-----
4 files changed, 41 insertions(+), 69 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
similarity index 61%
copy from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
index 495d48e342..5ca7c87f1b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
@@ -15,55 +15,40 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.registry;
+package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
- * Heart beat task
+ * Master heart beat task
*/
-public class HeartBeatTask implements Runnable {
+public class MasterHeartBeatTask implements Runnable {
- private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
+ private final Logger logger = LoggerFactory.getLogger(MasterHeartBeatTask.class);
private final Set<String> heartBeatPaths;
private final RegistryClient registryClient;
- private int workerWaitingTaskCount;
private final HeartBeat heartBeat;
-
private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
- public HeartBeatTask(long startupTime,
- double maxCpuloadAvg,
- double reservedMemory,
- Set<String> heartBeatPaths,
- RegistryClient registryClient) {
+ public MasterHeartBeatTask(long startupTime,
+ double maxCpuloadAvg,
+ double reservedMemory,
+ Set<String> heartBeatPaths,
+ RegistryClient registryClient) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory);
}
- public HeartBeatTask(long startupTime,
- double maxCpuloadAvg,
- double reservedMemory,
- int hostWeight,
- Set<String> heartBeatPaths,
- RegistryClient registryClient,
- int workerThreadCount,
- int workerWaitingTaskCount) {
- this.heartBeatPaths = heartBeatPaths;
- this.registryClient = registryClient;
- this.workerWaitingTaskCount = workerWaitingTaskCount;
- this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory, hostWeight, workerThreadCount);
- }
-
public String getHeartBeatInfo() {
return this.heartBeat.encodeHeartBeat();
}
@@ -74,10 +59,6 @@ public class HeartBeatTask implements Runnable {
if (!ServerLifeCycleManager.isRunning()) {
return;
}
- heartBeat.setStartupTime(ServerLifeCycleManager.getServerStartupTime());
- // update waiting task count
- heartBeat.setWorkerWaitingTaskCount(workerWaitingTaskCount);
-
for (String heartBeatPath : heartBeatPaths) {
registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 63938bd928..99a65e9fce 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -17,8 +17,9 @@
package org.apache.dolphinscheduler.server.master.registry;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang3.StringUtils;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
+import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
+
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -27,24 +28,25 @@ import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.service.FailoverService;
-import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+
+import org.apache.commons.lang3.StringUtils;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
-import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.google.common.collect.Sets;
/**
* <p>DolphinScheduler master register client, used to connect to registry and hand the registry events.
- * <p>When the Master node startup, it will register in registry center. And schedule a {@link HeartBeatTask} to update its metadata in registry.
+ * <p>When the Master node startup, it will register in registry center. And schedule a {@link MasterHeartBeatTask} to update its metadata in registry.
*/
@Component
public class MasterRegistryClient implements AutoCloseable {
@@ -166,11 +168,11 @@ public class MasterRegistryClient implements AutoCloseable {
logger.info("Master node : {} registering to registry center", masterConfig.getMasterAddress());
String localNodePath = masterConfig.getMasterRegistryNodePath();
Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
- HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
- masterConfig.getMaxCpuLoadAvg(),
- masterConfig.getReservedMemory(),
- Sets.newHashSet(localNodePath),
- registryClient);
+ MasterHeartBeatTask heartBeatTask = new MasterHeartBeatTask(startupTime,
+ masterConfig.getMaxCpuLoadAvg(),
+ masterConfig.getReservedMemory(),
+ Sets.newHashSet(localNodePath),
+ registryClient);
// remove before persist
registryClient.remove(localNodePath);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java
similarity index 81%
rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java
index 495d48e342..84506753be 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.registry;
+package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
@@ -29,9 +29,9 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* Heart beat task
*/
-public class HeartBeatTask implements Runnable {
+public class WorkerHeartBeatTask implements Runnable {
- private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
+ private final Logger logger = LoggerFactory.getLogger(WorkerHeartBeatTask.class);
private final Set<String> heartBeatPaths;
private final RegistryClient registryClient;
@@ -40,17 +40,7 @@ public class HeartBeatTask implements Runnable {
private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
- public HeartBeatTask(long startupTime,
- double maxCpuloadAvg,
- double reservedMemory,
- Set<String> heartBeatPaths,
- RegistryClient registryClient) {
- this.heartBeatPaths = heartBeatPaths;
- this.registryClient = registryClient;
- this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory);
- }
-
- public HeartBeatTask(long startupTime,
+ public WorkerHeartBeatTask(long startupTime,
double maxCpuloadAvg,
double reservedMemory,
int hostWeight,
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index 225439b33d..5c3f7bf507 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
-import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
@@ -112,14 +111,14 @@ public class WorkerRegistryClient implements AutoCloseable {
Set<String> workerZkPaths = getWorkerZkPaths();
long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds();
- HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
- workerConfig.getMaxCpuLoadAvg(),
- workerConfig.getReservedMemory(),
- workerConfig.getHostWeight(),
- workerZkPaths,
- registryClient,
- workerConfig.getExecThreads(),
- workerManagerThread.getThreadPoolQueueSize());
+ WorkerHeartBeatTask heartBeatTask = new WorkerHeartBeatTask(startupTime,
+ workerConfig.getMaxCpuLoadAvg(),
+ workerConfig.getReservedMemory(),
+ workerConfig.getHostWeight(),
+ workerZkPaths,
+ registryClient,
+ workerConfig.getExecThreads(),
+ workerManagerThread.getThreadPoolQueueSize());
for (String workerZKPath : workerZkPaths) {
// remove before persist