You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ji...@apache.org on 2022/08/23 03:30:21 UTC

[dolphinscheduler] branch dev updated: [Fix-9980] [Server] fix heartBeatTaskCount bug (#11232)

This is an automated email from the ASF dual-hosted git repository.

jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3f2ca7bca3 [Fix-9980] [Server] fix heartBeatTaskCount bug (#11232)
3f2ca7bca3 is described below

commit 3f2ca7bca3e612b2ce5d22324d22aa33fac04ea2
Author: JinYong Li <42...@users.noreply.github.com>
AuthorDate: Tue Aug 23 11:30:13 2022 +0800

    [Fix-9980] [Server] fix heartBeatTaskCount bug (#11232)
    
    * fix heartBeat bug
    
    * modify class name
    
    * fix conflict
    
    Co-authored-by: JinyLeeChina <ji...@foxmail.com>
---
 .../master/registry/MasterHeartBeatTask.java       | 43 ++++++----------------
 .../master/registry/MasterRegistryClient.java      | 32 ++++++++--------
 .../worker/registry/WorkerHeartBeatTask.java       | 18 ++-------
 .../worker/registry/WorkerRegistryClient.java      | 17 ++++-----
 4 files changed, 41 insertions(+), 69 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
similarity index 61%
copy from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
copy to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
index 495d48e342..5ca7c87f1b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
@@ -15,55 +15,40 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.registry;
+package org.apache.dolphinscheduler.server.master.registry;
 
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.utils.HeartBeat;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
- * Heart beat task
+ * Master heart beat task
  */
-public class HeartBeatTask implements Runnable {
+public class MasterHeartBeatTask implements Runnable {
 
-    private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
+    private final Logger logger = LoggerFactory.getLogger(MasterHeartBeatTask.class);
 
     private final Set<String> heartBeatPaths;
     private final RegistryClient registryClient;
-    private int workerWaitingTaskCount;
     private final HeartBeat heartBeat;
-
     private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
 
-    public HeartBeatTask(long startupTime,
-                         double maxCpuloadAvg,
-                         double reservedMemory,
-                         Set<String> heartBeatPaths,
-                         RegistryClient registryClient) {
+    public MasterHeartBeatTask(long startupTime,
+                               double maxCpuloadAvg,
+                               double reservedMemory,
+                               Set<String> heartBeatPaths,
+                               RegistryClient registryClient) {
         this.heartBeatPaths = heartBeatPaths;
         this.registryClient = registryClient;
         this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory);
     }
 
-    public HeartBeatTask(long startupTime,
-                         double maxCpuloadAvg,
-                         double reservedMemory,
-                         int hostWeight,
-                         Set<String> heartBeatPaths,
-                         RegistryClient registryClient,
-                         int workerThreadCount,
-                         int workerWaitingTaskCount) {
-        this.heartBeatPaths = heartBeatPaths;
-        this.registryClient = registryClient;
-        this.workerWaitingTaskCount = workerWaitingTaskCount;
-        this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory, hostWeight, workerThreadCount);
-    }
-
     public String getHeartBeatInfo() {
         return this.heartBeat.encodeHeartBeat();
     }
@@ -74,10 +59,6 @@ public class HeartBeatTask implements Runnable {
             if (!ServerLifeCycleManager.isRunning()) {
                 return;
             }
-            heartBeat.setStartupTime(ServerLifeCycleManager.getServerStartupTime());
-            // update waiting task count
-            heartBeat.setWorkerWaitingTaskCount(workerWaitingTaskCount);
-
             for (String heartBeatPath : heartBeatPaths) {
                 registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
             }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 63938bd928..99a65e9fce 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -17,8 +17,9 @@
 
 package org.apache.dolphinscheduler.server.master.registry;
 
-import com.google.common.collect.Sets;
-import org.apache.commons.lang3.StringUtils;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
+import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
+
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -27,24 +28,25 @@ import org.apache.dolphinscheduler.registry.api.RegistryException;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.service.FailoverService;
-import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+
+import org.apache.commons.lang3.StringUtils;
 
 import java.time.Duration;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
-import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.google.common.collect.Sets;
 
 /**
  * <p>DolphinScheduler master register client, used to connect to registry and hand the registry events.
- * <p>When the Master node startup, it will register in registry center. And schedule a {@link HeartBeatTask} to update its metadata in registry.
+ * <p>When the Master node startup, it will register in registry center. And schedule a {@link MasterHeartBeatTask} to update its metadata in registry.
  */
 @Component
 public class MasterRegistryClient implements AutoCloseable {
@@ -166,11 +168,11 @@ public class MasterRegistryClient implements AutoCloseable {
         logger.info("Master node : {} registering to registry center", masterConfig.getMasterAddress());
         String localNodePath = masterConfig.getMasterRegistryNodePath();
         Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
-        HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
-                masterConfig.getMaxCpuLoadAvg(),
-                masterConfig.getReservedMemory(),
-                Sets.newHashSet(localNodePath),
-                registryClient);
+        MasterHeartBeatTask heartBeatTask = new MasterHeartBeatTask(startupTime,
+            masterConfig.getMaxCpuLoadAvg(),
+            masterConfig.getReservedMemory(),
+            Sets.newHashSet(localNodePath),
+            registryClient);
 
         // remove before persist
         registryClient.remove(localNodePath);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java
similarity index 81%
rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java
index 495d48e342..84506753be 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.registry;
+package org.apache.dolphinscheduler.server.worker.registry;
 
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.utils.HeartBeat;
@@ -29,9 +29,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 /**
  * Heart beat task
  */
-public class HeartBeatTask implements Runnable {
+public class WorkerHeartBeatTask implements Runnable {
 
-    private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
+    private final Logger logger = LoggerFactory.getLogger(WorkerHeartBeatTask.class);
 
     private final Set<String> heartBeatPaths;
     private final RegistryClient registryClient;
@@ -40,17 +40,7 @@ public class HeartBeatTask implements Runnable {
 
     private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
 
-    public HeartBeatTask(long startupTime,
-                         double maxCpuloadAvg,
-                         double reservedMemory,
-                         Set<String> heartBeatPaths,
-                         RegistryClient registryClient) {
-        this.heartBeatPaths = heartBeatPaths;
-        this.registryClient = registryClient;
-        this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory);
-    }
-
-    public HeartBeatTask(long startupTime,
+    public WorkerHeartBeatTask(long startupTime,
                          double maxCpuloadAvg,
                          double reservedMemory,
                          int hostWeight,
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index 225439b33d..5c3f7bf507 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.registry.api.RegistryException;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
-import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
@@ -112,14 +111,14 @@ public class WorkerRegistryClient implements AutoCloseable {
         Set<String> workerZkPaths = getWorkerZkPaths();
         long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds();
 
-        HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
-                workerConfig.getMaxCpuLoadAvg(),
-                workerConfig.getReservedMemory(),
-                workerConfig.getHostWeight(),
-                workerZkPaths,
-                registryClient,
-                workerConfig.getExecThreads(),
-                workerManagerThread.getThreadPoolQueueSize());
+        WorkerHeartBeatTask heartBeatTask = new WorkerHeartBeatTask(startupTime,
+            workerConfig.getMaxCpuLoadAvg(),
+            workerConfig.getReservedMemory(),
+            workerConfig.getHostWeight(),
+            workerZkPaths,
+            registryClient,
+            workerConfig.getExecThreads(),
+            workerManagerThread.getThreadPoolQueueSize());
 
         for (String workerZKPath : workerZkPaths) {
             // remove before persist