You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:55 UTC

[dolphinscheduler] 28/29: Fix worker cannot shutdown due to resource close failed or heart beat check failed (#10979)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit f3250bf5fa6c33cacb92bc31cdcb1d734e8cc27b
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Jul 15 20:06:53 2022 +0800

    Fix worker cannot shutdown due to resource close failed or heart beat check failed (#10979)
    
    * Use try-with-resource to close resource, and add heart error threshold to avoid worker cannot close due to heart beat check failed
    
    * Move heartbeat error threshold to applicaiton.yml
    
    (cherry picked from commit 2be1d4bf0add2485d4bf322aef7bbaeabfc223de)
---
 .../apache/dolphinscheduler/common/Constants.java  |  1 +
 .../common/thread/ThreadUtils.java                 | 14 +++++---
 .../server/master/MasterServer.java                | 36 +++++++++----------
 .../server/master/config/MasterConfig.java         |  7 ++++
 .../master/registry/MasterRegistryClient.java      |  8 +++--
 .../master/runner/MasterSchedulerBootstrap.java    |  3 +-
 .../src/main/resources/application.yaml            |  2 ++
 .../server/registry/HeartBeatTask.java             | 19 ++++++++--
 .../service/bean/SpringApplicationContext.java     |  3 +-
 .../apache/dolphinscheduler/StandaloneServer.java  | 32 +++++++++++++++--
 .../src/main/resources/application.yaml            |  4 +++
 .../server/worker/WorkerServer.java                | 41 +++++++---------------
 .../server/worker/config/WorkerConfig.java         |  7 ++++
 .../worker/registry/WorkerRegistryClient.java      | 36 +++++++++++--------
 .../src/main/resources/application.yaml            |  2 ++
 15 files changed, 137 insertions(+), 78 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 5b813b22c7..b4397f007a 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -50,6 +50,7 @@ public final class Constants {
     public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters";
     public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers";
     public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters";
+
     public static final String FORMAT_SS = "%s%s";
     public static final String FORMAT_S_S = "%s/%s";
     public static final String FOLDER_SEPARATOR = "/";
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
index 5c8020b7cd..f4f2a17bc7 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
@@ -21,12 +21,18 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import lombok.experimental.UtilityClass;
 
 @UtilityClass
 public class ThreadUtils {
+
+    private static final Logger logger = LoggerFactory.getLogger(ThreadUtils.class);
+
     /**
      * Wrapper over newDaemonFixedThreadExecutor.
      *
@@ -35,10 +41,7 @@ public class ThreadUtils {
      * @return ExecutorService
      */
     public static ExecutorService newDaemonFixedThreadExecutor(String threadName, int threadsNum) {
-        ThreadFactory threadFactory = new ThreadFactoryBuilder()
-            .setDaemon(true)
-            .setNameFormat(threadName)
-            .build();
+        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
         return Executors.newFixedThreadPool(threadsNum, threadFactory);
     }
 
@@ -48,8 +51,9 @@ public class ThreadUtils {
     public static void sleep(final long millis) {
         try {
             Thread.sleep(millis);
-        } catch (final InterruptedException ignore) {
+        } catch (final InterruptedException interruptedException) {
             Thread.currentThread().interrupt();
+            logger.error("Current thread sleep error", interruptedException);
         }
     }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 66b258d5b2..6711ff3289 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -115,31 +115,27 @@ public class MasterServer implements IStoppable {
      * @param cause close cause
      */
     public void close(String cause) {
-
-        try {
-            // set stop signal is true
-            // execute only once
-            if (!Stopper.stop()) {
-                logger.warn("MasterServer is already stopped, current cause: {}", cause);
-                return;
-            }
+        // set stop signal is true
+        // execute only once
+        if (!Stopper.stop()) {
+            logger.warn("MasterServer is already stopped, current cause: {}", cause);
+            return;
+        }
+        // thread sleep 3 seconds for thread quietly stop
+        ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
+        try (MasterSchedulerBootstrap closedSchedulerBootstrap = masterSchedulerBootstrap;
+             MasterRPCServer closedRpcServer = masterRPCServer;
+             MasterRegistryClient closedMasterRegistryClient = masterRegistryClient;
+             // close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
+             // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
+             SpringApplicationContext closedSpringContext = springApplicationContext) {
 
             logger.info("Master server is stopping, current cause : {}", cause);
-
-            // thread sleep 3 seconds for thread quietly stop
-            ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
-            // close
-            this.masterSchedulerBootstrap.close();
-            this.masterRPCServer.close();
-            this.masterRegistryClient.closeRegistry();
-            // close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
-            // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
-            springApplicationContext.close();
-
-            logger.info("MasterServer stopped, current cause: {}", cause);
         } catch (Exception e) {
             logger.error("MasterServer stop failed, current cause: {}", cause, e);
+            return;
         }
+        logger.info("MasterServer stopped, current cause: {}", cause);
     }
 
     @Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index fe51e20984..7f6f124164 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -67,6 +67,10 @@ public class MasterConfig implements Validator {
      * Master heart beat task execute interval.
      */
     private Duration heartbeatInterval = Duration.ofSeconds(10);
+    /**
+     * Master heart beat task error threshold, if the continuous error count exceed this count, the master will close.
+     */
+    private int heartbeatErrorThreshold = 5;
     /**
      * task submit max retry times.
      */
@@ -129,6 +133,9 @@ public class MasterConfig implements Validator {
         if (masterConfig.getMaxCpuLoadAvg() <= 0) {
             masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
         }
+        if (masterConfig.getHeartbeatErrorThreshold() <= 0) {
+            errors.rejectValue("heartbeat-error-threshold", null, "should be a positive value");
+        }
         masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
     }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 486c360117..a59b712828 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -53,7 +53,7 @@ import com.google.common.collect.Sets;
  * <p>When the Master node startup, it will register in registry center. And schedule a {@link HeartBeatTask} to update its metadata in registry.
  */
 @Component
-public class MasterRegistryClient {
+public class MasterRegistryClient implements AutoCloseable {
 
     /**
      * logger
@@ -108,7 +108,8 @@ public class MasterRegistryClient {
         registryClient.setStoppable(stoppable);
     }
 
-    public void closeRegistry() {
+    @Override
+    public void close() {
         // TODO unsubscribe MasterRegistryDataListener
         deregister();
     }
@@ -194,7 +195,8 @@ public class MasterRegistryClient {
                                                         masterConfig.getReservedMemory(),
                                                         Sets.newHashSet(localNodePath),
                                                         Constants.MASTER_TYPE,
-                                                        registryClient);
+                                                        registryClient,
+                                                        masterConfig.getHeartbeatErrorThreshold());
 
         // remove before persist
         registryClient.remove(localNodePath);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index b013a0e9c4..523e5f839d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -57,7 +57,7 @@ import org.springframework.stereotype.Service;
  * Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
  */
 @Service
-public class MasterSchedulerBootstrap extends BaseDaemonThread {
+public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCloseable {
 
     private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerBootstrap.class);
 
@@ -112,6 +112,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread {
         logger.info("Master schedule bootstrap started...");
     }
 
+    @Override
     public void close() {
         logger.info("Master schedule bootstrap stopping...");
         logger.info("Master schedule bootstrap stopped...");
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml
index ff29b19389..b559806d28 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -98,6 +98,8 @@ master:
   host-selector: lower_weight
   # master heartbeat interval
   heartbeat-interval: 10s
+  # Master heart beat task error threshold, if the continuous error count exceed this count, the master will close.
+  heartbeat-error-threshold: 5
   # master commit task retry times
   task-commit-retry-times: 5
   # master commit task interval
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index 129aaff0de..c84abb4182 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.utils.HeartBeat;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,16 +39,22 @@ public class HeartBeatTask implements Runnable {
     private final String serverType;
     private final HeartBeat heartBeat;
 
+    private final int heartBeatErrorThreshold;
+
+    private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
+
     public HeartBeatTask(long startupTime,
                          double maxCpuloadAvg,
                          double reservedMemory,
                          Set<String> heartBeatPaths,
                          String serverType,
-                         RegistryClient registryClient) {
+                         RegistryClient registryClient,
+                         int heartBeatErrorThreshold) {
         this.heartBeatPaths = heartBeatPaths;
         this.registryClient = registryClient;
         this.serverType = serverType;
         this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory);
+        this.heartBeatErrorThreshold = heartBeatErrorThreshold;
     }
 
     public HeartBeatTask(long startupTime,
@@ -58,13 +65,14 @@ public class HeartBeatTask implements Runnable {
                          String serverType,
                          RegistryClient registryClient,
                          int workerThreadCount,
-                         int workerWaitingTaskCount
-    ) {
+                         int workerWaitingTaskCount,
+                         int heartBeatErrorThreshold) {
         this.heartBeatPaths = heartBeatPaths;
         this.registryClient = registryClient;
         this.workerWaitingTaskCount = workerWaitingTaskCount;
         this.serverType = serverType;
         this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory, hostWeight, workerThreadCount);
+        this.heartBeatErrorThreshold = heartBeatErrorThreshold;
     }
 
     public String getHeartBeatInfo() {
@@ -88,8 +96,13 @@ public class HeartBeatTask implements Runnable {
             for (String heartBeatPath : heartBeatPaths) {
                 registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
             }
+            heartBeatErrorTimes.set(0);
         } catch (Throwable ex) {
             logger.error("HeartBeat task execute failed", ex);
+            if (heartBeatErrorTimes.incrementAndGet() >= heartBeatErrorThreshold) {
+                registryClient.getStoppable()
+                              .stop("HeartBeat task connect to zk failed too much times: " + heartBeatErrorTimes);
+            }
         }
     }
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
index 61dfcb35d7..5b37b1f72d 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
@@ -24,7 +24,7 @@ import org.springframework.context.support.AbstractApplicationContext;
 import org.springframework.stereotype.Component;
 
 @Component
-public class SpringApplicationContext implements ApplicationContextAware {
+public class SpringApplicationContext implements ApplicationContextAware, AutoCloseable {
 
     private static ApplicationContext applicationContext;
 
@@ -36,6 +36,7 @@ public class SpringApplicationContext implements ApplicationContextAware {
     /**
      * Close this application context, destroying all beans in its bean factory.
      */
+    @Override
     public void close() {
         ((AbstractApplicationContext)applicationContext).close();
     }
diff --git a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java
index 3866c53066..728fed8eaa 100644
--- a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java
+++ b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java
@@ -19,15 +19,41 @@ package org.apache.dolphinscheduler;
 
 import org.apache.curator.test.TestingServer;
 
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.event.ApplicationFailedEvent;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.ContextClosedEvent;
+
+import lombok.NonNull;
 
 @SpringBootApplication
-public class StandaloneServer {
+public class StandaloneServer implements ApplicationListener<ApplicationEvent> {
+
+    private static final Logger logger = LoggerFactory.getLogger(StandaloneServer.class);
+
+    private static TestingServer zookeeperServer;
 
     public static void main(String[] args) throws Exception {
-        final TestingServer server = new TestingServer(true);
-        System.setProperty("registry.zookeeper.connect-string", server.getConnectString());
+        zookeeperServer = new TestingServer(true);
+        System.setProperty("registry.zookeeper.connect-string", zookeeperServer.getConnectString());
         SpringApplication.run(StandaloneServer.class, args);
     }
+
+    @Override
+    public void onApplicationEvent(@NonNull ApplicationEvent event) {
+        if (event instanceof ApplicationFailedEvent || event instanceof ContextClosedEvent) {
+            try (TestingServer closedServer = zookeeperServer) {
+                // close the zookeeper server
+                logger.info("Receive spring context close event: {}, will closed zookeeper server", event);
+            } catch (IOException e) {
+                logger.error("Close zookeeper server error", e);
+            }
+        }
+    }
 }
diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index 9a940b6c0e..8411360275 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -116,6 +116,8 @@ master:
   host-selector: lower_weight
   # master heartbeat interval
   heartbeat-interval: 10s
+  # Master heart beat task error threshold, if the continuous error count exceed this count, the master will close.
+  heartbeat-error-threshold: 5
   # master commit task retry times
   task-commit-retry-times: 5
   # master commit task interval
@@ -137,6 +139,8 @@ worker:
   exec-threads: 10
   # worker heartbeat interval
   heartbeat-interval: 10s
+  # Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close.
+  heartbeat-error-threshold: 5
   # worker host weight to dispatch tasks, default value 100
   host-weight: 100
   # worker tenant auto create
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 0321b76f44..471bc008eb 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.server.worker;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
-import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 import org.apache.commons.collections4.CollectionUtils;
 
 import java.util.Collection;
-import java.util.Set;
 
 import javax.annotation.PostConstruct;
 
@@ -111,8 +110,7 @@ public class WorkerServer implements IStoppable {
 
         this.workerRegistryClient.registry();
         this.workerRegistryClient.setRegistryStoppable(this);
-        Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
-        this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
+        this.workerRegistryClient.handleDeadServer();
 
         this.workerManagerThread.start();
 
@@ -129,37 +127,24 @@ public class WorkerServer implements IStoppable {
     }
 
     public void close(String cause) {
-        try {
-            // execute only once
-            // set stop signal is true
-            if (!Stopper.stop()) {
-                logger.warn("WorkerServer is already stopped, current cause: {}", cause);
-                return;
-            }
+        if (!Stopper.stop()) {
+            logger.warn("WorkerServer is already stopped, current cause: {}", cause);
+            return;
+        }
+        ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
 
+        try (WorkerRpcServer closedWorkerRpcServer = workerRpcServer;
+             WorkerRegistryClient closedRegistryClient = workerRegistryClient;
+             AlertClientService closedAlertClientService = alertClientService;
+             SpringApplicationContext closedSpringContext = springApplicationContext;) {
             logger.info("Worker server is stopping, current cause : {}", cause);
-
-            try {
-                // thread sleep 3 seconds for thread quitely stop
-                Thread.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
-            } catch (Exception e) {
-                logger.warn("Worker server close wait error", e);
-            }
-
-            // close
-            this.workerRpcServer.close();
-            this.workerRegistryClient.unRegistry();
-            this.alertClientService.close();
-
             // kill running tasks
             this.killAllRunningTasks();
-
-            // close the application context
-            this.springApplicationContext.close();
-            logger.info("Worker server stopped, current cause: {}", cause);
         } catch (Exception e) {
             logger.error("Worker server stop failed, current cause: {}", cause, e);
+            return;
         }
+        logger.info("Worker server stopped, current cause: {}", cause);
     }
 
     @Override
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index aab162d6d4..2367975715 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -40,6 +40,10 @@ public class WorkerConfig implements Validator {
     private int listenPort = 1234;
     private int execThreads = 10;
     private Duration heartbeatInterval = Duration.ofSeconds(10);
+    /**
+     * Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close.
+     */
+    private int heartbeatErrorThreshold = 5;
     private int hostWeight = 100;
     private boolean tenantAutoCreate = true;
     private boolean tenantDistributedUser = false;
@@ -70,6 +74,9 @@ public class WorkerConfig implements Validator {
         if (workerConfig.getMaxCpuLoadAvg() <= 0) {
             workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
         }
+        if (workerConfig.getHeartbeatErrorThreshold() <= 0) {
+            errors.rejectValue("heartbeat-error-threshold", null, "should be a positive value");
+        }
         workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort()));
     }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index b33b3ef0d7..99d18acc30 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -55,7 +55,7 @@ import com.google.common.collect.Sets;
  * worker registry
  */
 @Service
-public class WorkerRegistryClient {
+public class WorkerRegistryClient implements AutoCloseable {
 
     private final Logger logger = LoggerFactory.getLogger(WorkerRegistryClient.class);
 
@@ -102,15 +102,15 @@ public class WorkerRegistryClient {
         long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds();
 
         HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
-                workerConfig.getMaxCpuLoadAvg(),
-                workerConfig.getReservedMemory(),
-                workerConfig.getHostWeight(),
-                workerZkPaths,
-                Constants.WORKER_TYPE,
-                registryClient,
-                workerConfig.getExecThreads(),
-                workerManagerThread.getThreadPoolQueueSize()
-        );
+                                                        workerConfig.getMaxCpuLoadAvg(),
+                                                        workerConfig.getReservedMemory(),
+                                                        workerConfig.getHostWeight(),
+                                                        workerZkPaths,
+                                                        Constants.WORKER_TYPE,
+                                                        registryClient,
+                                                        workerConfig.getExecThreads(),
+                                                        workerManagerThread.getThreadPoolQueueSize(),
+                                                        workerConfig.getHeartbeatErrorThreshold());
 
         for (String workerZKPath : workerZkPaths) {
             // remove before persist
@@ -148,8 +148,10 @@ public class WorkerRegistryClient {
             logger.error("remove worker zk path exception", ex);
         }
 
-        this.heartBeatExecutor.shutdownNow();
-        logger.info("heartbeat executor shutdown");
+        if (heartBeatExecutor != null) {
+            heartBeatExecutor.shutdownNow();
+            logger.info("Heartbeat executor shutdown");
+        }
 
         registryClient.close();
         logger.info("registry client closed");
@@ -176,8 +178,9 @@ public class WorkerRegistryClient {
         return workerPaths;
     }
 
-    public void handleDeadServer(Set<String> nodeSet, NodeType nodeType, String opType) {
-        registryClient.handleDeadServer(nodeSet, nodeType, opType);
+    public void handleDeadServer() {
+        Set<String> workerZkPaths = getWorkerZkPaths();
+        registryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
     }
 
     /**
@@ -191,4 +194,9 @@ public class WorkerRegistryClient {
         registryClient.setStoppable(stoppable);
     }
 
+    @Override
+    public void close() throws IOException {
+        unRegistry();
+    }
+
 }
diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml b/dolphinscheduler-worker/src/main/resources/application.yaml
index d764f9d25b..7d653edc24 100644
--- a/dolphinscheduler-worker/src/main/resources/application.yaml
+++ b/dolphinscheduler-worker/src/main/resources/application.yaml
@@ -60,6 +60,8 @@ worker:
   exec-threads: 100
   # worker heartbeat interval
   heartbeat-interval: 10s
+  # Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close.
+  heartbeat-error-threshold: 5
   # worker host weight to dispatch tasks, default value 100
   host-weight: 100
   # worker tenant auto create