You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/08/31 08:20:34 UTC

[dolphinscheduler] branch dev updated: Refactor heart beat task, use json to serialize/deserialize (#11702)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 67e7f88d8b Refactor heart beat task, use json to serialize/deserialize (#11702)
67e7f88d8b is described below

commit 67e7f88d8b7f65696d36fcb3c85c2d0b7836cae8
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Aug 31 16:20:23 2022 +0800

    Refactor heart beat task, use json to serialize/deserialize (#11702)
    
    * Refactor heart beat task, use json to serialize/deserialize
---
 .../api/service/impl/WorkerGroupServiceImpl.java   |  10 +-
 .../common/model/BaseHeartBeatTask.java            |  81 +++++++
 .../dolphinscheduler/common/model/HeartBeat.java   |  19 +-
 .../common/model/MasterHeartBeat.java              |  35 +--
 .../dolphinscheduler/common/model/Server.java      |  92 +-------
 .../common/model/WorkerHeartBeat.java              |  39 +--
 .../dolphinscheduler/common/utils/HeartBeat.java   | 261 ---------------------
 .../dolphinscheduler/common/utils/JSONUtils.java   |  46 ++--
 .../common/utils/HeartBeatTest.java                |  77 ------
 .../server/master/config/MasterConfig.java         |   7 +-
 .../master/dispatch/host/CommonHostManager.java    |  24 +-
 .../dispatch/host/LowerWeightHostManager.java      |  39 ++-
 .../master/registry/MasterHeartBeatTask.java       |  70 ------
 .../master/registry/MasterRegistryClient.java      |  56 ++---
 .../server/master/registry/ServerNodeManager.java  | 160 +++++--------
 .../master/registry/WorkerInfoChangeListener.java  |   4 +-
 .../server/master/task/MasterHeartBeatTask.java    |  71 ++++++
 .../dispatch/host/RoundRobinHostManagerTest.java   |   2 +
 .../master/registry/MasterRegistryClientTest.java  |   8 +
 .../service/registry/RegistryClient.java           |  36 ++-
 .../server/worker/WorkerServer.java                |   4 +
 .../server/worker/config/WorkerConfig.java         |  18 ++
 .../worker/registry/WorkerHeartBeatTask.java       |  79 -------
 .../worker/registry/WorkerRegistryClient.java      | 110 ++-------
 .../server/worker/task/WorkerHeartBeatTask.java    | 107 +++++++++
 25 files changed, 522 insertions(+), 933 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index eee031942c..732ecd8e7b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -28,7 +28,9 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.enums.UserType;
-import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.common.model.HeartBeat;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
@@ -318,9 +320,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
             wg.setName(workerGroup);
             if (isPaging) {
                 String registeredValue = registryClient.get(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.iterator().next());
-                HeartBeat heartBeat = HeartBeat.decodeHeartBeat(registeredValue);
-                wg.setCreateTime(new Date(heartBeat.getStartupTime()));
-                wg.setUpdateTime(new Date(heartBeat.getReportTime()));
+                WorkerHeartBeat workerHeartBeat = JSONUtils.parseObject(registeredValue, WorkerHeartBeat.class);
+                wg.setCreateTime(new Date(workerHeartBeat.getStartupTime()));
+                wg.setUpdateTime(new Date(workerHeartBeat.getReportTime()));
                 wg.setSystemDefault(true);
                 if (workerGroupsMap != null && workerGroupsMap.containsKey(workerGroup)) {
                     wg.setDescription(workerGroupsMap.get(workerGroup).getDescription());
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java
new file mode 100644
index 0000000000..557f7ec86b
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/BaseHeartBeatTask.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.model;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+
+@Slf4j
+public abstract class BaseHeartBeatTask<T> extends BaseDaemonThread {
+
+    private final String threadName;
+    private final long heartBeatInterval;
+
+    protected boolean runningFlag;
+
+    public BaseHeartBeatTask(String threadName, long heartBeatInterval) {
+        super(threadName);
+        this.threadName = threadName;
+        this.heartBeatInterval = heartBeatInterval;
+        this.runningFlag = true;
+    }
+
+    @Override
+    public synchronized void start() {
+        log.info("Starting {}", threadName);
+        super.start();
+        log.info("Started {}, heartBeatInterval: {}", threadName, heartBeatInterval);
+    }
+
+    @Override
+    public void run() {
+        while (runningFlag) {
+            try {
+                if (!ServerLifeCycleManager.isRunning()) {
+                    log.info("The current server status is {}, will not write heartBeatInfo into registry", ServerLifeCycleManager.getServerStatus());
+                    continue;
+                }
+                T heartBeat = getHeartBeat();
+                writeHeartBeat(heartBeat);
+            } catch (Exception ex) {
+                log.error("{} task execute failed", threadName, ex);
+            } finally {
+                try {
+                    Thread.sleep(heartBeatInterval);
+                } catch (InterruptedException e) {
+                    handleInterruptException(e);
+                }
+            }
+        }
+    }
+
+    public void shutdown() {
+        log.warn("{} task finished", threadName);
+        runningFlag = false;
+    }
+
+    private void handleInterruptException(InterruptedException ex) {
+        log.warn("{} has been interrupted", threadName, ex);
+        Thread.currentThread().interrupt();
+    }
+
+    public abstract T getHeartBeat();
+
+    public abstract void writeHeartBeat(T heartBeat);
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java
similarity index 57%
copy from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
copy to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java
index f885a6fba0..1a1d1610e4 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java
@@ -15,22 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.registry;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * The listener used in {@link ServerNodeManager} to notify the change of worker info.
- */
-public interface WorkerInfoChangeListener {
-
-    /**
-     * Used to notify the change of worker info.
-     *
-     * @param workerGroups   worker groups map, key is worker group name, value is worker address.
-     * @param workerNodeInfo worker node info map, key is worker address, value is worker info.
-     */
-    void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo);
+package org.apache.dolphinscheduler.common.model;
 
+public interface HeartBeat {
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
similarity index 57%
copy from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
copy to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
index f885a6fba0..95ece3522e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
@@ -15,22 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.registry;
+package org.apache.dolphinscheduler.common.model;
 
-import java.util.Map;
-import java.util.Set;
-
-/**
- * The listener used in {@link ServerNodeManager} to notify the change of worker info.
- */
-public interface WorkerInfoChangeListener {
-
-    /**
-     * Used to notify the change of worker info.
-     *
-     * @param workerGroups   worker groups map, key is worker group name, value is worker address.
-     * @param workerNodeInfo worker node info map, key is worker address, value is worker info.
-     */
-    void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo);
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class MasterHeartBeat implements HeartBeat {
+    private long startupTime;
+    private long reportTime;
+    private double cpuUsage;
+    private double memoryUsage;
+    private double loadAverage;
+    private double availablePhysicalMemorySize;
+    private double maxCpuloadAvg;
+    private double reservedMemory;
+    private int processId;
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java
index 4bd4648e93..743979eed7 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java
@@ -17,31 +17,19 @@
 
 package org.apache.dolphinscheduler.common.model;
 
+import lombok.Data;
+
 import java.util.Date;
 
-/**
- * server
- */
+@Data
 public class Server {
 
-    /**
-     * id
-     */
     private int id;
 
-    /**
-     * host
-     */
     private String host;
 
-    /**
-     * port
-     */
     private int port;
 
-    /**
-     * master directory in zookeeper
-     */
     private String zkDirectory;
 
     /**
@@ -49,82 +37,8 @@ public class Server {
      */
     private String resInfo;
 
-    /**
-     * create time
-     */
     private Date createTime;
 
-    /**
-     * laster heart beat time
-     */
     private Date lastHeartbeatTime;
 
-    public int getId() {
-        return id;
-    }
-
-    public void setId(int id) {
-        this.id = id;
-    }
-
-    public String getHost() {
-        return host;
-    }
-
-    public void setHost(String host) {
-        this.host = host;
-    }
-
-    public int getPort() {
-        return port;
-    }
-
-    public void setPort(int port) {
-        this.port = port;
-    }
-
-    public Date getCreateTime() {
-        return createTime;
-    }
-
-    public void setCreateTime(Date createTime) {
-        this.createTime = createTime;
-    }
-
-    public String getZkDirectory() {
-        return zkDirectory;
-    }
-
-    public void setZkDirectory(String zkDirectory) {
-        this.zkDirectory = zkDirectory;
-    }
-
-    public Date getLastHeartbeatTime() {
-        return lastHeartbeatTime;
-    }
-
-    public void setLastHeartbeatTime(Date lastHeartbeatTime) {
-        this.lastHeartbeatTime = lastHeartbeatTime;
-    }
-
-    public String getResInfo() {
-        return resInfo;
-    }
-
-    public void setResInfo(String resInfo) {
-        this.resInfo = resInfo;
-    }
-
-    @Override
-    public String toString() {
-        return "MasterServer{" +
-                "id=" + id +
-                ", host='" + host + '\'' +
-                ", port=" + port +
-                ", zkDirectory='" + zkDirectory + '\'' +
-                ", resInfo='" + resInfo + '\'' +
-                ", createTime=" + createTime +
-                ", lastHeartbeatTime=" + lastHeartbeatTime +
-                '}';
-    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
similarity index 50%
copy from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
copy to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
index f885a6fba0..4bb765d180 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
@@ -15,22 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.registry;
+package org.apache.dolphinscheduler.common.model;
 
-import java.util.Map;
-import java.util.Set;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
-/**
- * The listener used in {@link ServerNodeManager} to notify the change of worker info.
- */
-public interface WorkerInfoChangeListener {
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class WorkerHeartBeat implements HeartBeat {
+
+    private long startupTime;
+    private long reportTime;
+    private double cpuUsage;
+    private double memoryUsage;
+    private double loadAverage;
+    private double availablePhysicalMemorySize;
+    private double maxCpuloadAvg;
+    private double reservedMemory;
+    private int serverStatus;
+    private int processId;
+
+    private int workerHostWeight; // worker host weight
+    private int workerWaitingTaskCount; // worker waiting task count
+    private int workerExecThreadCount; // worker thread pool thread count
 
-    /**
-     * Used to notify the change of worker info.
-     *
-     * @param workerGroups   worker groups map, key is worker group name, value is worker address.
-     * @param workerNodeInfo worker node info map, key is worker address, value is worker info.
-     */
-    void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo);
 
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
deleted file mode 100644
index ecfa814cc6..0000000000
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.common.utils;
-
-import org.apache.dolphinscheduler.common.Constants;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HeartBeat {
-
-    private static final Logger logger = LoggerFactory.getLogger(HeartBeat.class);
-
-    private long startupTime;
-    private long reportTime;
-    private double cpuUsage;
-    private double memoryUsage;
-    private double loadAverage;
-    private double availablePhysicalMemorySize;
-    private double maxCpuloadAvg;
-    private double reservedMemory;
-    private int serverStatus;
-    private int processId;
-
-    private int workerHostWeight; // worker host weight
-    private int workerWaitingTaskCount; // worker waiting task count
-    private int workerExecThreadCount; // worker thread pool thread count
-
-    private double diskAvailable;
-
-    public double getDiskAvailable() {
-        return diskAvailable;
-    }
-
-    public void setDiskAvailable(double diskAvailable) {
-        this.diskAvailable = diskAvailable;
-    }
-
-    public long getStartupTime() {
-        return startupTime;
-    }
-
-    public void setStartupTime(long startupTime) {
-        this.startupTime = startupTime;
-    }
-
-    public long getReportTime() {
-        return reportTime;
-    }
-
-    public void setReportTime(long reportTime) {
-        this.reportTime = reportTime;
-    }
-
-    public double getCpuUsage() {
-        return cpuUsage;
-    }
-
-    public void setCpuUsage(double cpuUsage) {
-        this.cpuUsage = cpuUsage;
-    }
-
-    public double getMemoryUsage() {
-        return memoryUsage;
-    }
-
-    public void setMemoryUsage(double memoryUsage) {
-        this.memoryUsage = memoryUsage;
-    }
-
-    public double getLoadAverage() {
-        return loadAverage;
-    }
-
-    public void setLoadAverage(double loadAverage) {
-        this.loadAverage = loadAverage;
-    }
-
-    public double getAvailablePhysicalMemorySize() {
-        return availablePhysicalMemorySize;
-    }
-
-    public void setAvailablePhysicalMemorySize(double availablePhysicalMemorySize) {
-        this.availablePhysicalMemorySize = availablePhysicalMemorySize;
-    }
-
-    public double getMaxCpuloadAvg() {
-        return maxCpuloadAvg;
-    }
-
-    public void setMaxCpuloadAvg(double maxCpuloadAvg) {
-        this.maxCpuloadAvg = maxCpuloadAvg;
-    }
-
-    public double getReservedMemory() {
-        return reservedMemory;
-    }
-
-    public void setReservedMemory(double reservedMemory) {
-        this.reservedMemory = reservedMemory;
-    }
-
-    public int getServerStatus() {
-        return serverStatus;
-    }
-
-    public void setServerStatus(int serverStatus) {
-        this.serverStatus = serverStatus;
-    }
-
-    public int getProcessId() {
-        return processId;
-    }
-
-    public void setProcessId(int processId) {
-        this.processId = processId;
-    }
-
-    public int getWorkerHostWeight() {
-        return workerHostWeight;
-    }
-
-    public void setWorkerHostWeight(int workerHostWeight) {
-        this.workerHostWeight = workerHostWeight;
-    }
-
-    public int getWorkerWaitingTaskCount() {
-        return workerWaitingTaskCount;
-    }
-
-    public void setWorkerWaitingTaskCount(int workerWaitingTaskCount) {
-        this.workerWaitingTaskCount = workerWaitingTaskCount;
-    }
-
-    public int getWorkerExecThreadCount() {
-        return workerExecThreadCount;
-    }
-
-    public void setWorkerExecThreadCount(int workerExecThreadCount) {
-        this.workerExecThreadCount = workerExecThreadCount;
-    }
-
-    public HeartBeat() {
-        this.reportTime = System.currentTimeMillis();
-        this.serverStatus = Constants.NORMAL_NODE_STATUS;
-    }
-
-    public HeartBeat(long startupTime, double maxCpuloadAvg, double reservedMemory) {
-        this.reportTime = System.currentTimeMillis();
-        this.serverStatus = Constants.NORMAL_NODE_STATUS;
-        this.startupTime = startupTime;
-        this.maxCpuloadAvg = maxCpuloadAvg;
-        this.reservedMemory = reservedMemory;
-    }
-
-    public HeartBeat(long startupTime, double maxCpuloadAvg, double reservedMemory, int hostWeight, int workerExecThreadCount) {
-        this.reportTime = System.currentTimeMillis();
-        this.serverStatus = Constants.NORMAL_NODE_STATUS;
-        this.startupTime = startupTime;
-        this.maxCpuloadAvg = maxCpuloadAvg;
-        this.reservedMemory = reservedMemory;
-        this.workerHostWeight = hostWeight;
-        this.workerExecThreadCount = workerExecThreadCount;
-    }
-
-    /**
-     * fill system info
-     */
-    private void fillSystemInfo() {
-        this.cpuUsage = OSUtils.cpuUsage();
-        this.loadAverage = OSUtils.loadAverage();
-        this.availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
-        this.memoryUsage = OSUtils.memoryUsage();
-        this.diskAvailable = OSUtils.diskAvailable();
-        this.processId = OSUtils.getProcessID();
-    }
-
-    /**
-     * update server state
-     */
-    public void updateServerState() {
-        this.reportTime = System.currentTimeMillis();
-        if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) {
-            logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
-                    loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory);
-            this.serverStatus = Constants.ABNORMAL_NODE_STATUS;
-        } else if (workerWaitingTaskCount > workerExecThreadCount) {
-            logger.warn("current waiting task count {} is large than worker thread count {}, worker is busy", workerWaitingTaskCount, workerExecThreadCount);
-            this.serverStatus = Constants.BUSY_NODE_STATUE;
-        } else {
-            this.serverStatus = Constants.NORMAL_NODE_STATUS;
-        }
-    }
-
-    /**
-     * encode heartbeat
-     */
-    public String encodeHeartBeat() {
-        this.fillSystemInfo();
-        this.updateServerState();
-
-        StringBuilder builder = new StringBuilder(100);
-        builder.append(cpuUsage).append(Constants.COMMA);
-        builder.append(memoryUsage).append(Constants.COMMA);
-        builder.append(loadAverage).append(Constants.COMMA);
-        builder.append(availablePhysicalMemorySize).append(Constants.COMMA);
-        builder.append(maxCpuloadAvg).append(Constants.COMMA);
-        builder.append(reservedMemory).append(Constants.COMMA);
-        builder.append(startupTime).append(Constants.COMMA);
-        builder.append(reportTime).append(Constants.COMMA);
-        builder.append(serverStatus).append(Constants.COMMA);
-        builder.append(processId).append(Constants.COMMA);
-        builder.append(workerHostWeight).append(Constants.COMMA);
-        builder.append(workerExecThreadCount).append(Constants.COMMA);
-        builder.append(workerWaitingTaskCount).append(Constants.COMMA);
-        builder.append(diskAvailable);
-
-        return builder.toString();
-    }
-
-    /**
-     * decode heartbeat
-     */
-    public static HeartBeat decodeHeartBeat(String heartBeatInfo) {
-        String[] parts = heartBeatInfo.split(Constants.COMMA);
-        if (parts.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH) {
-            return null;
-        }
-        HeartBeat heartBeat = new HeartBeat();
-        heartBeat.cpuUsage = Double.parseDouble(parts[0]);
-        heartBeat.memoryUsage = Double.parseDouble(parts[1]);
-        heartBeat.loadAverage = Double.parseDouble(parts[2]);
-        heartBeat.availablePhysicalMemorySize = Double.parseDouble(parts[3]);
-        heartBeat.maxCpuloadAvg = Double.parseDouble(parts[4]);
-        heartBeat.reservedMemory = Double.parseDouble(parts[5]);
-        heartBeat.startupTime = Long.parseLong(parts[6]);
-        heartBeat.reportTime = Long.parseLong(parts[7]);
-        heartBeat.serverStatus = Integer.parseInt(parts[8]);
-        heartBeat.processId = Integer.parseInt(parts[9]);
-        heartBeat.workerHostWeight = Integer.parseInt(parts[10]);
-        heartBeat.workerExecThreadCount = Integer.parseInt(parts[11]);
-        heartBeat.workerWaitingTaskCount = Integer.parseInt(parts[12]);
-        heartBeat.diskAvailable = Double.parseDouble(parts[13]);
-        return heartBeat;
-    }
-}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
index 1a09ae11ef..b94e34e510 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
@@ -17,28 +17,6 @@
 
 package org.apache.dolphinscheduler.common.utils;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
-import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
-import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
-import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
-
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.TimeZone;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -55,6 +33,26 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.databind.node.TextNode;
 import com.fasterxml.jackson.databind.type.CollectionType;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TimeZone;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+import static java.nio.charset.StandardCharsets.UTF_8;
 
 /**
  * json utils
@@ -130,7 +128,7 @@ public class JSONUtils {
      * @return an object of type T from the string
      * classOfT
      */
-    public static <T> T parseObject(String json, Class<T> clazz) {
+    public static @Nullable <T> T parseObject(String json, Class<T> clazz) {
         if (StringUtils.isEmpty(json)) {
             return null;
         }
@@ -138,7 +136,7 @@ public class JSONUtils {
         try {
             return objectMapper.readValue(json, clazz);
         } catch (Exception e) {
-            logger.error("parse object exception!", e);
+            logger.error("Parse object exception, jsonStr: {}, class: {}", json, clazz, e);
         }
         return null;
     }
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java
deleted file mode 100644
index c207f6bfb5..0000000000
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.common.utils;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.dolphinscheduler.common.Constants;
-
-import org.junit.Test;
-
-/**
- * NetUtilsTest
- */
-public class HeartBeatTest {
-
-    @Test
-    public void testAbnormalState() {
-        long startupTime = System.currentTimeMillis();
-        double loadAverage = 100;
-        double reservedMemory = 100;
-        HeartBeat heartBeat = new HeartBeat(startupTime, loadAverage, reservedMemory);
-        heartBeat.updateServerState();
-        assertEquals(Constants.ABNORMAL_NODE_STATUS, heartBeat.getServerStatus());
-    }
-
-    @Test
-    public void testBusyState() {
-        long startupTime = System.currentTimeMillis();
-        double loadAverage = 0;
-        double reservedMemory = 0;
-        int hostWeight = 1;
-        int taskCount = 200;
-        int workerThreadCount = 199;
-        HeartBeat heartBeat = new HeartBeat(startupTime, loadAverage, reservedMemory, hostWeight, workerThreadCount);
-
-        heartBeat.setWorkerWaitingTaskCount(taskCount);
-        heartBeat.updateServerState();
-        assertEquals(Constants.BUSY_NODE_STATUE, heartBeat.getServerStatus());
-    }
-
-    @Test
-    public void testDecodeHeartBeat() throws Exception {
-        String heartBeatInfo = "0.35,0.58,3.09,6.47,5.0,1.0,1634033006749,1634033006857,1,29732,1,199,200,65.86";
-        HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
-
-        double delta = 0.001;
-        assertEquals(0.35, heartBeat.getCpuUsage(), delta);
-        assertEquals(0.58, heartBeat.getMemoryUsage(), delta);
-        assertEquals(3.09, heartBeat.getLoadAverage(), delta);
-        assertEquals(6.47, heartBeat.getAvailablePhysicalMemorySize(), delta);
-        assertEquals(5.0, heartBeat.getMaxCpuloadAvg(), delta);
-        assertEquals(1.0, heartBeat.getReservedMemory(), delta);
-        assertEquals(1634033006749L, heartBeat.getStartupTime());
-        assertEquals(1634033006857L, heartBeat.getReportTime());
-        assertEquals(1, heartBeat.getServerStatus());
-        assertEquals(29732, heartBeat.getProcessId());
-        assertEquals(199, heartBeat.getWorkerExecThreadCount());
-        assertEquals(200, heartBeat.getWorkerWaitingTaskCount());
-        assertEquals(65.86, heartBeat.getDiskAvailable(), delta);
-    }
-
-}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 26e2cbe02a..b0426f477a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -95,7 +95,7 @@ public class MasterConfig implements Validator {
     private String masterAddress;
 
     // /nodes/master/ip:listenPort
-    private String masterRegistryNodePath;
+    private String masterRegistryPath;
 
     @Override
     public boolean supports(Class<?> clazz) {
@@ -139,8 +139,7 @@ public class MasterConfig implements Validator {
             masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
         }
         masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
-        masterConfig
-                .setMasterRegistryNodePath(REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterConfig.getMasterAddress());
+        masterConfig.setMasterRegistryPath(REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterConfig.getMasterAddress());
         printConfig();
     }
 
@@ -161,6 +160,6 @@ public class MasterConfig implements Validator {
         logger.info("Master config: killYarnJobWhenTaskFailover -> {} ", killYarnJobWhenTaskFailover);
         logger.info("Master config: registryDisconnectStrategy -> {} ", registryDisconnectStrategy);
         logger.info("Master config: masterAddress -> {} ", masterAddress);
-        logger.info("Master config: masterRegistryNodePath -> {} ", masterRegistryNodePath);
+        logger.info("Master config: masterRegistryPath -> {} ", masterRegistryPath);
     }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
index 4051068c53..bbf84a3d74 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
@@ -17,8 +17,8 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.host;
 
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
@@ -27,14 +27,13 @@ import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
-import org.springframework.beans.factory.annotation.Autowired;
-
 /**
  * common host manager
  */
@@ -80,23 +79,10 @@ public abstract class CommonHostManager implements HostManager {
         Set<String> nodes = serverNodeManager.getWorkerGroupNodes(workerGroup);
         if (CollectionUtils.isNotEmpty(nodes)) {
             for (String node : nodes) {
-                String heartbeat = serverNodeManager.getWorkerNodeInfo(node);
-                int hostWeight = getWorkerHostWeightFromHeartbeat(heartbeat);
-                hostWorkers.add(HostWorker.of(node, hostWeight, workerGroup));
+                WorkerHeartBeat workerNodeInfo = serverNodeManager.getWorkerNodeInfo(node);
+                hostWorkers.add(HostWorker.of(node, workerNodeInfo.getWorkerHostWeight(), workerGroup));
             }
         }
         return hostWorkers;
     }
-
-    protected int getWorkerHostWeightFromHeartbeat(String heartBeatInfo) {
-        int hostWeight = Constants.DEFAULT_WORKER_HOST_WEIGHT;
-        if (!StringUtils.isEmpty(heartBeatInfo)) {
-            HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
-            if (heartBeat != null) {
-                hostWeight = heartBeat.getWorkerHostWeight();
-            }
-        }
-        return hostWeight;
-    }
-
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 5002484144..3ad46ddd53 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -17,18 +17,19 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.host;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
 import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
 import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
 import org.apache.dolphinscheduler.server.master.registry.WorkerInfoChangeListener;
-import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.annotation.PostConstruct;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -39,11 +40,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import javax.annotation.PostConstruct;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * lower weight host manager
  */
@@ -97,7 +93,7 @@ public class LowerWeightHostManager extends CommonHostManager {
 
     private class WorkerWeightListener implements WorkerInfoChangeListener {
         @Override
-        public void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo) {
+        public void notify(Map<String, Set<String>> workerGroups, Map<String, WorkerHeartBeat> workerNodeInfo) {
             syncWorkerResources(workerGroups, workerNodeInfo);
         }
     }
@@ -109,7 +105,7 @@ public class LowerWeightHostManager extends CommonHostManager {
      * @param workerNodeInfoMap worker node info map, key is worker node, value is worker info.
      */
     private void syncWorkerResources(final Map<String, Set<String>> workerGroupNodes,
-                                     final Map<String, String> workerNodeInfoMap) {
+                                     final Map<String, WorkerHeartBeat> workerNodeInfoMap) {
         try {
             Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
             for (Map.Entry<String, Set<String>> entry : workerGroupNodes.entrySet()) {
@@ -117,7 +113,7 @@ public class LowerWeightHostManager extends CommonHostManager {
                 Set<String> nodes = entry.getValue();
                 Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
                 for (String node : nodes) {
-                    String heartbeat = workerNodeInfoMap.getOrDefault(node, null);
+                    WorkerHeartBeat heartbeat = workerNodeInfoMap.getOrDefault(node, null);
                     Optional<HostWeight> hostWeightOpt = getHostWeight(node, workerGroup, heartbeat);
                     hostWeightOpt.ifPresent(hostWeights::add);
                 }
@@ -131,13 +127,9 @@ public class LowerWeightHostManager extends CommonHostManager {
         }
     }
 
-    private Optional<HostWeight> getHostWeight(String addr, String workerGroup, String heartBeatInfo) {
-        if (StringUtils.isEmpty(heartBeatInfo)) {
-            logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup);
-            return Optional.empty();
-        }
-        HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
+    public Optional<HostWeight> getHostWeight(String addr, String workerGroup, WorkerHeartBeat heartBeat) {
         if (heartBeat == null) {
+            logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup);
             return Optional.empty();
         }
         if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) {
@@ -151,12 +143,15 @@ public class LowerWeightHostManager extends CommonHostManager {
             return Optional.empty();
         }
         return Optional.of(
-                new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup),
-                        heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(),
-                        heartBeat.getWorkerWaitingTaskCount(), heartBeat.getStartupTime()));
+                new HostWeight(
+                        HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup),
+                        heartBeat.getCpuUsage(),
+                        heartBeat.getMemoryUsage(),
+                        heartBeat.getLoadAverage(),
+                        heartBeat.getWorkerWaitingTaskCount(),
+                        heartBeat.getStartupTime()));
     }
 
-
     private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights) {
         lock.lock();
         try {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
deleted file mode 100644
index 5ca7c87f1b..0000000000
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.registry;
-
-import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
-import org.apache.dolphinscheduler.common.utils.HeartBeat;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
-
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Master heart beat task
- */
-public class MasterHeartBeatTask implements Runnable {
-
-    private final Logger logger = LoggerFactory.getLogger(MasterHeartBeatTask.class);
-
-    private final Set<String> heartBeatPaths;
-    private final RegistryClient registryClient;
-    private final HeartBeat heartBeat;
-    private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
-
-    public MasterHeartBeatTask(long startupTime,
-                               double maxCpuloadAvg,
-                               double reservedMemory,
-                               Set<String> heartBeatPaths,
-                               RegistryClient registryClient) {
-        this.heartBeatPaths = heartBeatPaths;
-        this.registryClient = registryClient;
-        this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory);
-    }
-
-    public String getHeartBeatInfo() {
-        return this.heartBeat.encodeHeartBeat();
-    }
-
-    @Override
-    public void run() {
-        try {
-            if (!ServerLifeCycleManager.isRunning()) {
-                return;
-            }
-            for (String heartBeatPath : heartBeatPaths) {
-                registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
-            }
-            heartBeatErrorTimes.set(0);
-        } catch (Throwable ex) {
-            logger.error("HeartBeat task execute failed, errorTimes: {}", heartBeatErrorTimes.get(), ex);
-        }
-    }
-}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 99a65e9fce..ac8d60e20e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -17,36 +17,28 @@
 
 package org.apache.dolphinscheduler.server.master.registry;
 
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
-import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
-
+import org.apache.commons.lang3.StringUtils;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.registry.api.RegistryException;
-import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.service.FailoverService;
+import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.time.Duration;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import com.google.common.collect.Sets;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
+import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
 
 /**
  * <p>DolphinScheduler master register client, used to connect to registry and hand the registry events.
- * <p>When the Master node startup, it will register in registry center. And schedule a {@link MasterHeartBeatTask} to update its metadata in registry.
+ * <p>When the Master node startup, it will register in registry center. And start a {@link MasterHeartBeatTask} to update its metadata in registry.
  */
 @Component
 public class MasterRegistryClient implements AutoCloseable {
@@ -65,18 +57,11 @@ public class MasterRegistryClient implements AutoCloseable {
     @Autowired
     private MasterConnectStrategy masterConnectStrategy;
 
-    private ScheduledExecutorService heartBeatExecutor;
-
-    /**
-     * master startup time, ms
-     */
-    private long startupTime;
+    private MasterHeartBeatTask masterHeartBeatTask;
 
     public void start() {
         try {
-            this.startupTime = System.currentTimeMillis();
-            this.heartBeatExecutor =
-                    Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
+            this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, registryClient);
             // master registry
             registry();
             registryClient.addConnectionStateListener(
@@ -166,17 +151,11 @@ public class MasterRegistryClient implements AutoCloseable {
      */
     void registry() {
         logger.info("Master node : {} registering to registry center", masterConfig.getMasterAddress());
-        String localNodePath = masterConfig.getMasterRegistryNodePath();
-        Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
-        MasterHeartBeatTask heartBeatTask = new MasterHeartBeatTask(startupTime,
-            masterConfig.getMaxCpuLoadAvg(),
-            masterConfig.getReservedMemory(),
-            Sets.newHashSet(localNodePath),
-            registryClient);
+        String masterRegistryPath = masterConfig.getMasterRegistryPath();
 
         // remove before persist
-        registryClient.remove(localNodePath);
-        registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
+        registryClient.remove(masterRegistryPath);
+        registryClient.persistEphemeral(masterRegistryPath, JSONUtils.toJsonString(masterHeartBeatTask.getHeartBeat()));
 
         while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
             logger.warn("The current master server node:{} cannot find in registry", NetUtils.getHost());
@@ -186,19 +165,18 @@ public class MasterRegistryClient implements AutoCloseable {
         // sleep 1s, waiting master failover remove
         ThreadUtils.sleep(SLEEP_TIME_MILLIS);
 
-        this.heartBeatExecutor.scheduleWithFixedDelay(heartBeatTask, 0L, masterHeartbeatInterval.getSeconds(),
-                TimeUnit.SECONDS);
-        logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s",
-                masterConfig.getMasterAddress(), masterHeartbeatInterval);
+        masterHeartBeatTask.start();
+        logger.info("Master node : {} registered to registry center successfully", masterConfig.getMasterAddress());
 
     }
 
     public void deregister() {
         try {
-            registryClient.remove(masterConfig.getMasterRegistryNodePath());
+            registryClient.remove(masterConfig.getMasterRegistryPath());
             logger.info("Master node : {} unRegistry to register center.", masterConfig.getMasterAddress());
-            heartBeatExecutor.shutdown();
-            logger.info("MasterServer heartbeat executor shutdown");
+            if (masterHeartBeatTask != null) {
+                masterHeartBeatTask.shutdown();
+            }
             registryClient.close();
         } catch (Exception e) {
             logger.error("MasterServer remove registry path exception ", e);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 58df904fc5..af96da3f2f 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -17,13 +17,13 @@
 
 package org.apache.dolphinscheduler.server.master.registry;
 
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
-
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
 import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
@@ -34,10 +34,13 @@ import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
+import javax.annotation.PreDestroy;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -52,14 +55,10 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import javax.annotation.PreDestroy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
 
 /**
  * server node manager
@@ -69,23 +68,19 @@ public class ServerNodeManager implements InitializingBean {
 
     private final Logger logger = LoggerFactory.getLogger(ServerNodeManager.class);
 
-    /**
-     * master lock
-     */
     private final Lock masterLock = new ReentrantLock();
 
-    /**
-     * worker group lock
-     */
-    private final Lock workerGroupLock = new ReentrantLock();
+    private final ReentrantReadWriteLock workerGroupLock = new ReentrantReadWriteLock();
+    private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = workerGroupLock.readLock();
+    private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = workerGroupLock.writeLock();
 
-    /**
-     * worker node info lock
-     */
-    private final Lock workerNodeInfoLock = new ReentrantLock();
+
+    private final ReentrantReadWriteLock workerNodeInfoLock = new ReentrantReadWriteLock();
+    private final ReentrantReadWriteLock.ReadLock workerNodeInfoReadLock = workerNodeInfoLock.readLock();
+    private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = workerNodeInfoLock.writeLock();
 
     /**
-     * worker group nodes
+     * worker group nodes, workerGroup -> ips
      */
     private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>();
 
@@ -94,10 +89,7 @@ public class ServerNodeManager implements InitializingBean {
      */
     private final Set<String> masterNodes = new HashSet<>();
 
-    /**
-     * worker node info
-     */
-    private final Map<String, String> workerNodeInfo = new HashMap<>();
+    private final Map<String, WorkerHeartBeat> workerNodeInfo = new HashMap<>();
 
     /**
      * executor service
@@ -108,7 +100,7 @@ public class ServerNodeManager implements InitializingBean {
     private RegistryClient registryClient;
 
     /**
-     * eg : /node/worker/group/127.0.0.1:xxx
+     * eg : /dolphinscheduler/node/worker/group/127.0.0.1:xxx
      */
     private static final int WORKER_LISTENER_CHECK_LENGTH = 5;
 
@@ -244,26 +236,29 @@ public class ServerNodeManager implements InitializingBean {
             final String data = event.data();
             if (registryClient.isWorkerPath(path)) {
                 try {
+                    String[] parts = path.split("/");
+                    if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
+                        throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
+                    }
+                    final String workerGroupName = parts[parts.length - 2];
+                    final String workerAddress = parts[parts.length - 1];
+
                     if (type == Type.ADD) {
                         logger.info("worker group node : {} added.", path);
-                        String group = parseGroup(path);
-                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
+                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
                         logger.info("currentNodes : {}", currentNodes);
-                        syncWorkerGroupNodes(group, currentNodes);
+                        syncWorkerGroupNodes(workerGroupName, currentNodes);
                     } else if (type == Type.REMOVE) {
                         logger.info("worker group node : {} down.", path);
-                        String group = parseGroup(path);
-                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
-                        syncWorkerGroupNodes(group, currentNodes);
+                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
+                        syncWorkerGroupNodes(workerGroupName, currentNodes);
                         alertDao.sendServerStoppedAlert(1, path, "WORKER");
                     } else if (type == Type.UPDATE) {
                         logger.debug("worker group node : {} update, data: {}", path, data);
-                        String group = parseGroup(path);
-                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
-                        syncWorkerGroupNodes(group, currentNodes);
+                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
+                        syncWorkerGroupNodes(workerGroupName, currentNodes);
 
-                        String node = parseNode(path);
-                        syncSingleWorkerNodeInfo(node, data);
+                        syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class));
                     }
                     notifyWorkerInfoChangeListeners();
                 } catch (IllegalArgumentException ex) {
@@ -274,22 +269,6 @@ public class ServerNodeManager implements InitializingBean {
 
             }
         }
-
-        private String parseGroup(String path) {
-            String[] parts = path.split("/");
-            if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
-                throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
-            }
-            return parts[parts.length - 2];
-        }
-
-        private String parseNode(String path) {
-            String[] parts = path.split("/");
-            if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
-                throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
-            }
-            return parts[parts.length - 1];
-        }
     }
 
     class MasterDataListener implements SubscribeListener {
@@ -333,20 +312,6 @@ public class ServerNodeManager implements InitializingBean {
 
     }
 
-    /**
-     * get master nodes
-     *
-     * @return master nodes
-     */
-    public Set<String> getMasterNodes() {
-        masterLock.lock();
-        try {
-            return Collections.unmodifiableSet(masterNodes);
-        } finally {
-            masterLock.unlock();
-        }
-    }
-
     /**
      * sync master nodes
      *
@@ -355,18 +320,17 @@ public class ServerNodeManager implements InitializingBean {
     private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {
         masterLock.lock();
         try {
-            String addr = NetUtils.getAddr(NetUtils.getHost(), masterConfig.getListenPort());
             this.masterNodes.addAll(nodes);
             this.masterPriorityQueue.clear();
             this.masterPriorityQueue.putList(masterNodes);
-            int index = masterPriorityQueue.getIndex(addr);
+            int index = masterPriorityQueue.getIndex(masterConfig.getMasterAddress());
             if (index >= 0) {
                 MASTER_SIZE = nodes.size();
                 MASTER_SLOT = index;
             } else {
-                logger.warn("current addr:{} is not in active master list", addr);
+                logger.warn("current addr:{} is not in active master list", masterConfig.getMasterAddress());
             }
-            logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, addr);
+            logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, masterConfig.getMasterAddress());
         } finally {
             masterLock.unlock();
         }
@@ -379,19 +343,24 @@ public class ServerNodeManager implements InitializingBean {
      * @param nodes worker nodes
      */
     private void syncWorkerGroupNodes(String workerGroup, Collection<String> nodes) {
-        workerGroupLock.lock();
+        workerGroupWriteLock.lock();
         try {
             Set<String> workerNodes = workerGroupNodes.getOrDefault(workerGroup, new HashSet<>());
             workerNodes.clear();
             workerNodes.addAll(nodes);
             workerGroupNodes.put(workerGroup, workerNodes);
         } finally {
-            workerGroupLock.unlock();
+            workerGroupWriteLock.unlock();
         }
     }
 
     public Map<String, Set<String>> getWorkerGroupNodes() {
-        return Collections.unmodifiableMap(workerGroupNodes);
+        workerGroupReadLock.lock();
+        try {
+            return Collections.unmodifiableMap(workerGroupNodes);
+        } finally {
+            workerGroupReadLock.unlock();
+        }
     }
 
     /**
@@ -401,7 +370,7 @@ public class ServerNodeManager implements InitializingBean {
      * @return worker nodes
      */
     public Set<String> getWorkerGroupNodes(String workerGroup) {
-        workerGroupLock.lock();
+        workerGroupReadLock.lock();
         try {
             if (StringUtils.isEmpty(workerGroup)) {
                 workerGroup = Constants.DEFAULT_WORKER_GROUP;
@@ -412,16 +381,11 @@ public class ServerNodeManager implements InitializingBean {
             }
             return nodes;
         } finally {
-            workerGroupLock.unlock();
+            workerGroupReadLock.unlock();
         }
     }
 
-    /**
-     * get worker node info
-     *
-     * @return worker node info
-     */
-    public Map<String, String> getWorkerNodeInfo() {
+    public Map<String, WorkerHeartBeat> getWorkerNodeInfo() {
         return Collections.unmodifiableMap(workerNodeInfo);
     }
 
@@ -431,12 +395,12 @@ public class ServerNodeManager implements InitializingBean {
      * @param workerNode worker node
      * @return worker node info
      */
-    public String getWorkerNodeInfo(String workerNode) {
-        workerNodeInfoLock.lock();
+    public WorkerHeartBeat getWorkerNodeInfo(String workerNode) {
+        workerNodeInfoReadLock.lock();
         try {
             return workerNodeInfo.getOrDefault(workerNode, null);
         } finally {
-            workerNodeInfoLock.unlock();
+            workerNodeInfoReadLock.unlock();
         }
     }
 
@@ -446,24 +410,26 @@ public class ServerNodeManager implements InitializingBean {
      * @param newWorkerNodeInfo new worker node info
      */
     private void syncAllWorkerNodeInfo(Map<String, String> newWorkerNodeInfo) {
-        workerNodeInfoLock.lock();
+        workerNodeInfoWriteLock.lock();
         try {
             workerNodeInfo.clear();
-            workerNodeInfo.putAll(newWorkerNodeInfo);
+            for (Map.Entry<String, String> entry : newWorkerNodeInfo.entrySet()) {
+                workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
+            }
         } finally {
-            workerNodeInfoLock.unlock();
+            workerNodeInfoWriteLock.unlock();
         }
     }
 
     /**
      * sync single worker node info
      */
-    private void syncSingleWorkerNodeInfo(String node, String info) {
-        workerNodeInfoLock.lock();
+    private void syncSingleWorkerNodeInfo(String node, WorkerHeartBeat info) {
+        workerNodeInfoWriteLock.lock();
         try {
             workerNodeInfo.put(node, info);
         } finally {
-            workerNodeInfoLock.unlock();
+            workerNodeInfoWriteLock.unlock();
         }
     }
 
@@ -478,7 +444,7 @@ public class ServerNodeManager implements InitializingBean {
 
     private void notifyWorkerInfoChangeListeners() {
         Map<String, Set<String>> workerGroupNodes = getWorkerGroupNodes();
-        Map<String, String> workerNodeInfo = getWorkerNodeInfo();
+        Map<String, WorkerHeartBeat> workerNodeInfo = getWorkerNodeInfo();
         for (WorkerInfoChangeListener listener : workerInfoChangeListeners) {
             listener.notify(workerGroupNodes, workerNodeInfo);
         }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
index f885a6fba0..9efc3517b8 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.server.master.registry;
 
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
+
 import java.util.Map;
 import java.util.Set;
 
@@ -31,6 +33,6 @@ public interface WorkerInfoChangeListener {
      * @param workerGroups   worker groups map, key is worker group name, value is worker address.
      * @param workerNodeInfo worker node info map, key is worker address, value is worker info.
      */
-    void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo);
+    void notify(Map<String, Set<String>> workerGroups, Map<String, WorkerHeartBeat> workerNodeInfo);
 
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
new file mode 100644
index 0000000000..53b90b7370
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.task;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
+import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+@Slf4j
+public class MasterHeartBeatTask extends BaseHeartBeatTask<MasterHeartBeat> {
+
+    private final MasterConfig masterConfig;
+
+    private final RegistryClient registryClient;
+
+    private final String heartBeatPath;
+
+    private final int processId;
+
+    public MasterHeartBeatTask(@NonNull MasterConfig masterConfig,
+                               @NonNull RegistryClient registryClient) {
+        super("MasterHeartBeatTask", masterConfig.getHeartbeatInterval().toMillis());
+        this.masterConfig = masterConfig;
+        this.registryClient = registryClient;
+        this.heartBeatPath = masterConfig.getMasterRegistryPath();
+        this.processId = OSUtils.getProcessID();
+    }
+
+    @Override
+    public MasterHeartBeat getHeartBeat() {
+        return MasterHeartBeat.builder()
+                .startupTime(ServerLifeCycleManager.getServerStartupTime())
+                .reportTime(System.currentTimeMillis())
+                .cpuUsage(OSUtils.cpuUsage())
+                .loadAverage(OSUtils.loadAverage())
+                .availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize())
+                .maxCpuloadAvg(masterConfig.getMaxCpuLoadAvg())
+                .reservedMemory(masterConfig.getReservedMemory())
+                .processId(processId)
+                .build();
+    }
+
+    @Override
+    public void writeHeartBeat(MasterHeartBeat masterHeartBeat) {
+        String masterHeartBeatJson = JSONUtils.toJsonString(masterHeartBeat);
+        registryClient.persistEphemeral(heartBeatPath, masterHeartBeatJson);
+        log.info("Success write master heartBeatInfo into registry, masterRegistryPath: {}, heartBeatInfo: {}",
+                heartBeatPath, masterHeartBeatJson);
+    }
+}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
index c8121d494b..c0c554e43b 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.host;
 
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.master.dispatch.ExecutionContextTestUtils;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@@ -55,6 +56,7 @@ public class RoundRobinHostManagerTest {
     @Test
     public void testSelectWithResult() {
         Mockito.when(serverNodeManager.getWorkerGroupNodes("default")).thenReturn(Sets.newHashSet("192.168.1.1:22"));
+        Mockito.when(serverNodeManager.getWorkerNodeInfo("192.168.1.1:22")).thenReturn(new WorkerHeartBeat());
         ExecutionContext context = ExecutionContextTestUtils.getExecutionContext(10000);
         Host host = roundRobinHostManager.select(context);
         Assert.assertTrue(!Strings.isNullOrEmpty(host.getAddress()));
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
index cc17d22df7..b71e267216 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
@@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.registry.api.ConnectionState;
 import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
@@ -69,6 +70,12 @@ public class MasterRegistryClientTest {
     @Mock
     private ProcessService processService;
 
+    @Mock
+    private MasterConnectStrategy masterConnectStrategy;
+
+    @Mock
+    private MasterHeartBeatTask masterHeartBeatTask;
+
     @Mock
     private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager;
 
@@ -81,6 +88,7 @@ public class MasterRegistryClientTest {
 
         });
         ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient);
+        ReflectionTestUtils.setField(masterRegistryClient, "masterHeartBeatTask", masterHeartBeatTask);
 
         ProcessInstance processInstance = new ProcessInstance();
         processInstance.setId(1);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
index a6c6218fee..508ba293aa 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
@@ -19,11 +19,13 @@ package org.apache.dolphinscheduler.service.registry;
 
 import com.google.common.base.Strings;
 import lombok.NonNull;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
 import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.utils.HeartBeat;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.registry.api.ConnectionListener;
 import org.apache.dolphinscheduler.registry.api.Registry;
@@ -94,21 +96,33 @@ public class RegistryClient {
 
         List<Server> serverList = new ArrayList<>();
         for (Map.Entry<String, String> entry : serverMaps.entrySet()) {
-            HeartBeat heartBeat = HeartBeat.decodeHeartBeat(entry.getValue());
-            if (heartBeat == null) {
+            String serverPath = entry.getKey();
+            String heartBeatJson = entry.getValue();
+            if (StringUtils.isEmpty(heartBeatJson)) {
+                logger.error("The heartBeatJson is empty, serverPath: {}", serverPath);
                 continue;
             }
-
             Server server = new Server();
-            server.setResInfo(JSONUtils.toJsonString(heartBeat));
-            server.setCreateTime(new Date(heartBeat.getStartupTime()));
-            server.setLastHeartbeatTime(new Date(heartBeat.getReportTime()));
-            server.setId(heartBeat.getProcessId());
+            switch (nodeType) {
+                case MASTER:
+                    MasterHeartBeat masterHeartBeat = JSONUtils.parseObject(heartBeatJson, MasterHeartBeat.class);
+                    server.setCreateTime(new Date(masterHeartBeat.getStartupTime()));
+                    server.setLastHeartbeatTime(new Date(masterHeartBeat.getReportTime()));
+                    server.setId(masterHeartBeat.getProcessId());
+                    break;
+                case WORKER:
+                    WorkerHeartBeat workerHeartBeat = JSONUtils.parseObject(heartBeatJson, WorkerHeartBeat.class);
+                    server.setCreateTime(new Date(workerHeartBeat.getStartupTime()));
+                    server.setLastHeartbeatTime(new Date(workerHeartBeat.getReportTime()));
+                    server.setId(workerHeartBeat.getProcessId());
+                    break;
+            }
 
-            String key = entry.getKey();
-            server.setZkDirectory(parentPath + "/" + key);
+            server.setResInfo(heartBeatJson);
+            // todo: add host, port in heartBeat Info, so that we don't need to parse this again
+            server.setZkDirectory(parentPath + "/" + serverPath);
             // set host and port
-            String[] hostAndPort = key.split(COLON);
+            String[] hostAndPort = serverPath.split(COLON);
             String[] hosts = hostAndPort[0].split(DIVISION_STRING);
             // fetch the last one
             server.setHost(hosts[hosts.length - 1]);
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index f29f93eed3..555b5bc770 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
@@ -95,6 +96,9 @@ public class WorkerServer implements IStoppable {
     @Autowired
     private MessageRetryRunner messageRetryRunner;
 
+    @Autowired
+    private WorkerConfig workerConfig;
+
     /**
      * worker server startup, not use web service
      *
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index f50e6f2d03..a7e6927415 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.config;
 
 import com.google.common.collect.Sets;
 import lombok.Data;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
 import org.slf4j.Logger;
@@ -31,6 +32,9 @@ import org.springframework.validation.annotation.Validated;
 
 import java.time.Duration;
 import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
 
 @Data
 @Validated
@@ -57,6 +61,7 @@ public class WorkerConfig implements Validator {
      * This field doesn't need to set at config file, it will be calculated by workerIp:listenPort
      */
     private String workerAddress;
+    private Set<String> workerGroupRegistryPaths;
 
     @Override
     public boolean supports(Class<?> clazz) {
@@ -76,6 +81,18 @@ public class WorkerConfig implements Validator {
             workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
         }
         workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort()));
+
+        workerConfig.setGroups(workerConfig.getGroups().stream().map(String::trim).collect(Collectors.toSet()));
+        if (CollectionUtils.isEmpty(workerConfig.getGroups())) {
+            errors.rejectValue("groups", null, "should not be empty");
+        }
+
+        Set<String> workerRegistryPaths = workerConfig.getGroups()
+                .stream()
+                .map(workerGroup -> REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup + "/" + workerConfig.getWorkerAddress())
+                .collect(Collectors.toSet());
+
+        workerConfig.setWorkerGroupRegistryPaths(workerRegistryPaths);
         printConfig();
     }
 
@@ -93,5 +110,6 @@ public class WorkerConfig implements Validator {
         logger.info("Worker config: alertListenPort -> {}", alertListenPort);
         logger.info("Worker config: registryDisconnectStrategy -> {}", registryDisconnectStrategy);
         logger.info("Worker config: workerAddress -> {}", registryDisconnectStrategy);
+        logger.info("Worker config: workerGroupRegistryPaths: {}", workerGroupRegistryPaths);
     }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java
deleted file mode 100644
index 84506753be..0000000000
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerHeartBeatTask.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.registry;
-
-import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
-import org.apache.dolphinscheduler.common.utils.HeartBeat;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Heart beat task
- */
-public class WorkerHeartBeatTask implements Runnable {
-
-    private final Logger logger = LoggerFactory.getLogger(WorkerHeartBeatTask.class);
-
-    private final Set<String> heartBeatPaths;
-    private final RegistryClient registryClient;
-    private int workerWaitingTaskCount;
-    private final HeartBeat heartBeat;
-
-    private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
-
-    public WorkerHeartBeatTask(long startupTime,
-                         double maxCpuloadAvg,
-                         double reservedMemory,
-                         int hostWeight,
-                         Set<String> heartBeatPaths,
-                         RegistryClient registryClient,
-                         int workerThreadCount,
-                         int workerWaitingTaskCount) {
-        this.heartBeatPaths = heartBeatPaths;
-        this.registryClient = registryClient;
-        this.workerWaitingTaskCount = workerWaitingTaskCount;
-        this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory, hostWeight, workerThreadCount);
-    }
-
-    public String getHeartBeatInfo() {
-        return this.heartBeat.encodeHeartBeat();
-    }
-
-    @Override
-    public void run() {
-        try {
-            if (!ServerLifeCycleManager.isRunning()) {
-                return;
-            }
-            heartBeat.setStartupTime(ServerLifeCycleManager.getServerStartupTime());
-            // update waiting task count
-            heartBeat.setWorkerWaitingTaskCount(workerWaitingTaskCount);
-
-            for (String heartBeatPath : heartBeatPaths) {
-                registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
-            }
-            heartBeatErrorTimes.set(0);
-        } catch (Throwable ex) {
-            logger.error("HeartBeat task execute failed, errorTimes: {}", heartBeatErrorTimes.get(), ex);
-        }
-    }
-}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index 5c3f7bf507..d2147d1559 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -17,80 +17,52 @@
 
 package org.apache.dolphinscheduler.server.worker.registry;
 
-import com.google.common.base.Strings;
-import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.registry.api.RegistryException;
-import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import org.apache.dolphinscheduler.server.worker.task.WorkerHeartBeatTask;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
 import java.io.IOException;
-import java.util.Set;
-import java.util.StringJoiner;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
-import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+
 import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
 
-/**
- * worker registry
- */
+@Slf4j
 @Service
 public class WorkerRegistryClient implements AutoCloseable {
 
-    private final Logger logger = LoggerFactory.getLogger(WorkerRegistryClient.class);
-
-    /**
-     * worker config
-     */
     @Autowired
     private WorkerConfig workerConfig;
 
-    /**
-     * worker manager
-     */
     @Autowired
     private WorkerManagerThread workerManagerThread;
 
-    /**
-     * heartbeat executor
-     */
-    private ScheduledExecutorService heartBeatExecutor;
-
     @Autowired
     private RegistryClient registryClient;
 
     @Autowired
     private WorkerConnectStrategy workerConnectStrategy;
 
-    /**
-     * worker startup time, ms
-     */
-    private long startupTime;
+    private WorkerHeartBeatTask workerHeartBeatTask;
 
-    private Set<String> workerGroups;
 
     @PostConstruct
     public void initWorkRegistry() {
-        this.workerGroups = workerConfig.getGroups();
-        this.startupTime = System.currentTimeMillis();
-        this.heartBeatExecutor =
-                Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
+        this.workerHeartBeatTask = new WorkerHeartBeatTask(
+                workerConfig,
+                registryClient,
+                () -> workerManagerThread.getWaitSubmitQueueSize());
     }
 
     public void start() {
@@ -107,24 +79,13 @@ public class WorkerRegistryClient implements AutoCloseable {
      * registry
      */
     private void registry() {
-        String address = NetUtils.getAddr(workerConfig.getListenPort());
-        Set<String> workerZkPaths = getWorkerZkPaths();
-        long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds();
-
-        WorkerHeartBeatTask heartBeatTask = new WorkerHeartBeatTask(startupTime,
-            workerConfig.getMaxCpuLoadAvg(),
-            workerConfig.getReservedMemory(),
-            workerConfig.getHostWeight(),
-            workerZkPaths,
-            registryClient,
-            workerConfig.getExecThreads(),
-            workerManagerThread.getThreadPoolQueueSize());
-
-        for (String workerZKPath : workerZkPaths) {
+        WorkerHeartBeat workerHeartBeat = workerHeartBeatTask.getHeartBeat();
+
+        for (String workerZKPath : workerConfig.getWorkerGroupRegistryPaths()) {
             // remove before persist
             registryClient.remove(workerZKPath);
-            registryClient.persistEphemeral(workerZKPath, heartBeatTask.getHeartBeatInfo());
-            logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath);
+            registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString(workerHeartBeat));
+            log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(), workerZKPath);
         }
 
         while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.WORKER)) {
@@ -134,37 +95,9 @@ public class WorkerRegistryClient implements AutoCloseable {
         // sleep 1s, waiting master failover remove
         ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
 
-        this.heartBeatExecutor.scheduleWithFixedDelay(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval,
-                TimeUnit.SECONDS);
-        logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval);
-    }
-
-    /**
-     * get worker path
-     */
-    public Set<String> getWorkerZkPaths() {
-        Set<String> workerPaths = Sets.newHashSet();
-        String address = getLocalAddress();
-
-        for (String workGroup : this.workerGroups) {
-            StringJoiner workerPathJoiner = new StringJoiner(SINGLE_SLASH);
-            workerPathJoiner.add(REGISTRY_DOLPHINSCHEDULER_WORKERS);
-            if (Strings.isNullOrEmpty(workGroup)) {
-                workGroup = DEFAULT_WORKER_GROUP;
-            }
-            // trim and lower case is need
-            workerPathJoiner.add(workGroup.trim().toLowerCase());
-            workerPathJoiner.add(address);
-            workerPaths.add(workerPathJoiner.toString());
-        }
-        return workerPaths;
-    }
 
-    /**
-     * get local address
-     */
-    private String getLocalAddress() {
-        return NetUtils.getAddr(workerConfig.getListenPort());
+        workerHeartBeatTask.start();
+        log.info("Worker node: {} registry finished", workerConfig.getWorkerAddress());
     }
 
     public void setRegistryStoppable(IStoppable stoppable) {
@@ -173,12 +106,11 @@ public class WorkerRegistryClient implements AutoCloseable {
 
     @Override
     public void close() throws IOException {
-        if (heartBeatExecutor != null) {
-            heartBeatExecutor.shutdownNow();
-            logger.info("Heartbeat executor shutdown");
+        if (workerHeartBeatTask != null) {
+            workerHeartBeatTask.shutdown();
         }
         registryClient.close();
-        logger.info("registry client closed");
+        log.info("Worker registry client closed");
     }
 
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
new file mode 100644
index 0000000000..672135613a
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.task;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import java.util.function.Supplier;
+
+@Slf4j
+public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
+
+    private final WorkerConfig workerConfig;
+    private final RegistryClient registryClient;
+
+    private final Supplier<Integer> workerWaitingTaskCount;
+
+    private final int processId;
+
+    public WorkerHeartBeatTask(@NonNull WorkerConfig workerConfig,
+                               @NonNull RegistryClient registryClient,
+                               @NonNull Supplier<Integer> workerWaitingTaskCount) {
+        super("WorkerHeartBeatTask", workerConfig.getHeartbeatInterval().toMillis());
+        this.workerConfig = workerConfig;
+        this.registryClient = registryClient;
+        this.workerWaitingTaskCount = workerWaitingTaskCount;
+        this.processId = OSUtils.getProcessID();
+    }
+
+    @Override
+    public WorkerHeartBeat getHeartBeat() {
+        double loadAverage = OSUtils.loadAverage();
+        double cpuUsage = OSUtils.cpuUsage();
+        int maxCpuLoadAvg = workerConfig.getMaxCpuLoadAvg();
+        double reservedMemory = workerConfig.getReservedMemory();
+        double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
+        int execThreads = workerConfig.getExecThreads();
+        int workerWaitingTaskCount = this.workerWaitingTaskCount.get();
+        int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, availablePhysicalMemorySize, reservedMemory, execThreads, workerWaitingTaskCount);
+
+        return WorkerHeartBeat.builder()
+                .startupTime(ServerLifeCycleManager.getServerStartupTime())
+                .reportTime(System.currentTimeMillis())
+                .cpuUsage(cpuUsage)
+                .loadAverage(loadAverage)
+                .availablePhysicalMemorySize(availablePhysicalMemorySize)
+                .maxCpuloadAvg(maxCpuLoadAvg)
+                .reservedMemory(reservedMemory)
+                .processId(processId)
+                .workerHostWeight(workerConfig.getHostWeight())
+                .workerWaitingTaskCount(this.workerWaitingTaskCount.get())
+                .workerExecThreadCount(workerConfig.getExecThreads())
+                .serverStatus(serverStatus)
+                .build();
+    }
+
+    @Override
+    public void writeHeartBeat(WorkerHeartBeat workerHeartBeat) {
+        String workerHeartBeatJson = JSONUtils.toJsonString(workerHeartBeat);
+        for (String workerGroupRegistryPath : workerConfig.getWorkerGroupRegistryPaths()) {
+            registryClient.persistEphemeral(workerGroupRegistryPath, workerHeartBeatJson);
+        }
+        log.info("Success write worker group heartBeatInfo into registry, workGroupPath: {} workerHeartBeatInfo: {}",
+                workerConfig.getWorkerGroupRegistryPaths(), workerHeartBeatJson);
+    }
+
+    public int getServerStatus(double loadAverage,
+                               double maxCpuloadAvg,
+                               double availablePhysicalMemorySize,
+                               double reservedMemory,
+                               int workerExecThreadCount,
+                               int workerWaitingTaskCount) {
+        if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) {
+            log.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
+                    loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory);
+            return Constants.ABNORMAL_NODE_STATUS;
+        } else if (workerWaitingTaskCount > workerExecThreadCount) {
+            log.warn("current waiting task count {} is large than worker thread count {}, worker is busy", workerWaitingTaskCount, workerExecThreadCount);
+            return Constants.BUSY_NODE_STATUE;
+        } else {
+            return Constants.NORMAL_NODE_STATUS;
+        }
+    }
+}