You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/08/13 01:52:15 UTC

[dolphinscheduler] branch dev updated: [Feature-7024] Add waiting strategy to support master/worker can recover from registry lost (#11368)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7ff34c3947 [Feature-7024] Add waiting strategy to support master/worker can recover from registry lost (#11368)
7ff34c3947 is described below

commit 7ff34c3947102afc04645a7f50fe49b7d7b3c75e
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sat Aug 13 09:52:03 2022 +0800

    [Feature-7024] Add waiting strategy to support master/worker can recover from registry lost (#11368)
    
    * Add waiting strategy to support master/worker can recover from registry lost
    
    * throw exception when zookeeper registry start failed due to interrupted
---
 docs/docs/en/architecture/configuration.md         |   4 +
 docs/docs/zh/architecture/configuration.md         |   4 +
 .../dolphinscheduler/alert/AlertSenderService.java |  20 +-
 .../apache/dolphinscheduler/alert/AlertServer.java |   5 +-
 .../service/impl/ProcessDefinitionServiceImpl.java | 609 ++++++++++++---------
 dolphinscheduler-bom/pom.xml                       | 101 ++--
 .../apache/dolphinscheduler/common/Constants.java  |  29 +-
 .../common/lifecycle/ServerLifeCycleException.java |  29 +
 .../common/lifecycle/ServerLifeCycleManager.java   |  75 +++
 .../common/lifecycle/ServerStatus.java             |  45 ++
 .../dolphinscheduler/common/thread/Stopper.java    |  58 --
 .../server/master/MasterServer.java                |  28 +-
 .../cache/ProcessInstanceExecCacheManager.java     |   3 +
 .../impl/ProcessInstanceExecCacheManagerImpl.java  |   8 +-
 .../server/master/config/MasterConfig.java         |  53 +-
 .../master/consumer/TaskPriorityQueueConsumer.java |  48 +-
 .../dispatch/executor/NettyExecutorManager.java    |  12 +-
 .../server/master/event/WorkflowEventQueue.java    |   3 +
 .../processor/queue/StateEventResponseService.java |   4 +-
 .../master/processor/queue/TaskEventService.java   |  21 +-
 .../master/registry/MasterConnectStrategy.java     |  24 +
 .../registry/MasterConnectionStateListener.java    |  27 +-
 .../master/registry/MasterRegistryClient.java      |  94 +---
 .../server/master/registry/MasterStopStrategy.java |  58 ++
 .../master/registry/MasterWaitingStrategy.java     | 134 +++++
 .../server/master/rpc/MasterRPCServer.java         |   9 +-
 .../server/master/runner/EventExecuteService.java  |   4 +-
 .../master/runner/FailoverExecuteThread.java       |   9 +-
 .../master/runner/MasterSchedulerBootstrap.java    |  70 ++-
 .../master/runner/StateWheelExecuteThread.java     |  11 +-
 .../server/master/runner/WorkflowEventLooper.java  |  33 +-
 .../src/main/resources/application.yaml            |   7 +-
 .../server/master/DependentTaskTest.java           |  13 +-
 .../server/master/SubProcessTaskTest.java          |   8 +-
 .../consumer/TaskPriorityQueueConsumerTest.java    |   4 +-
 .../master/dispatch/ExecutorDispatcherTest.java    |   6 +-
 .../master/registry/MasterRegistryClientTest.java  |   9 +-
 .../server/master/service/FailoverServiceTest.java |   2 -
 .../registry/api/ConnectStrategy.java              |  31 ++
 .../registry/api/ConnectStrategyProperties.java    |  31 ++
 .../dolphinscheduler/registry/api/Registry.java    |  11 +
 .../registry/api/StrategyType.java                 |  25 +
 .../plugin/registry/mysql/MysqlRegistry.java       |  46 +-
 .../registry/mysql/task/EphemeralDateManager.java  |   9 +-
 .../registry/zookeeper/ZookeeperRegistry.java      |  97 ++--
 .../server/registry/HeartBeatTask.java             |  36 +-
 .../dolphinscheduler/service/cron/CronUtils.java   |  69 ++-
 .../service/registry/RegistryClient.java           |  85 +--
 .../server/worker/WorkerServer.java                |  49 +-
 .../server/worker/config/WorkerConfig.java         |  43 +-
 .../server/worker/message/MessageRetryRunner.java  |  25 +-
 .../worker/registry/WorkerConnectStrategy.java     |  24 +
 .../registry/WorkerConnectionStateListener.java    |  35 +-
 .../worker/registry/WorkerRegistryClient.java      | 102 ++--
 .../server/worker/registry/WorkerStopStrategy.java |  55 ++
 .../worker/registry/WorkerWaitingStrategy.java     | 135 +++++
 .../server/worker/runner/WorkerExecService.java    |  18 +-
 .../server/worker/runner/WorkerManagerThread.java  |  35 +-
 .../src/main/resources/application.yaml            |   7 +-
 .../worker/registry/WorkerRegistryClientTest.java  |  19 +-
 60 files changed, 1700 insertions(+), 968 deletions(-)

diff --git a/docs/docs/en/architecture/configuration.md b/docs/docs/en/architecture/configuration.md
index aa15fb4651..5e757b6d6f 100644
--- a/docs/docs/en/architecture/configuration.md
+++ b/docs/docs/en/architecture/configuration.md
@@ -268,6 +268,8 @@ Location: `master-server/conf/application.yaml`
 |master.reserved-memory|0.3|master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G|
 |master.failover-interval|10|failover interval, the unit is minute|
 |master.kill-yarn-job-when-task-failover|true|whether to kill yarn job when failover taskInstance|
+|master.registry-disconnect-strategy.strategy|stop|Used when the master disconnect from registry, default value: stop. Optional values include stop, waiting|
+|master.registry-disconnect-strategy.max-waiting-time|100s|Used when the master disconnect from registry, and the disconnect strategy is waiting, this config means the master will waiting to reconnect to registry in given times, and after the waiting times, if the master still cannot connect to registry, will stop itself, if the value is 0s, the Master will waitting infinitely|
 
 
 ### Worker Server related configuration
@@ -285,6 +287,8 @@ Location: `worker-server/conf/application.yaml`
 |worker.groups|default|worker groups separated by comma, e.g., 'worker.groups=default,test' <br> worker will join corresponding group according to this config when startup|
 |worker.alert-listen-host|localhost|the alert listen host of worker|
 |worker.alert-listen-port|50052|the alert listen port of worker|
+|worker.registry-disconnect-strategy.strategy|stop|Used when the worker disconnect from registry, default value: stop. Optional values include stop, waiting|
+|worker.registry-disconnect-strategy.max-waiting-time|100s|Used when the worker disconnect from registry, and the disconnect strategy is waiting, this config means the worker will waiting to reconnect to registry in given times, and after the waiting times, if the worker still cannot connect to registry, will stop itself, if the value is 0s, will waitting infinitely |
 
 ### Alert Server related configuration
 Location: `alert-server/conf/application.yaml`
diff --git a/docs/docs/zh/architecture/configuration.md b/docs/docs/zh/architecture/configuration.md
index 1097b50786..5303893ce5 100644
--- a/docs/docs/zh/architecture/configuration.md
+++ b/docs/docs/zh/architecture/configuration.md
@@ -255,6 +255,8 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn相关的配置
 |master.reserved-memory|0.3|master预留内存,只有低于系统可用内存时,master服务才能调度任务,单位为G|
 |master.failover-interval|10|failover间隔,单位为分钟|
 |master.kill-yarn-job-when-task-failover|true|当任务实例failover时,是否kill掉yarn job|
+|master.registry-disconnect-strategy.strategy|stop|当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting|
+|master.registry-disconnect-strategy.max-waiting-time|100s|当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 |
 
 ## Worker Server相关配置
 位置:`worker-server/conf/application.yaml`
@@ -270,6 +272,8 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn相关的配置
 |worker.groups|default|worker分组配置,逗号分隔,例如'worker.groups=default,test' <br> worker启动时会根据该配置自动加入对应的分组|
 |worker.alert-listen-host|localhost|alert监听host|
 |worker.alert-listen-port|50052|alert监听端口|
+|worker.registry-disconnect-strategy.strategy|stop|当Worker与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting|
+|worker.registry-disconnect-strategy.max-waiting-time|100s|当Worker与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Worker与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Worker会丢弃kill正在执行的任务。值为0表示会无限期等待 |
 
 
 ## Alert Server相关配置
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
index 9af74e668c..92b3cea45b 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.alert;
 
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.dolphinscheduler.alert.api.AlertChannel;
 import org.apache.dolphinscheduler.alert.api.AlertConstants;
 import org.apache.dolphinscheduler.alert.api.AlertData;
@@ -26,7 +28,7 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AlertStatus;
 import org.apache.dolphinscheduler.common.enums.AlertType;
 import org.apache.dolphinscheduler.common.enums.WarningType;
-import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
@@ -34,9 +36,11 @@ import org.apache.dolphinscheduler.dao.entity.Alert;
 import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
 import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
 import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
 
-import org.apache.commons.collections.CollectionUtils;
-
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -44,14 +48,6 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import javax.annotation.Nullable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-
-import com.google.common.collect.Lists;
-
 @Service
 public final class AlertSenderService extends Thread {
 
@@ -76,7 +72,7 @@ public final class AlertSenderService extends Thread {
     @Override
     public void run() {
         logger.info("alert sender started");
-        while (Stopper.isRunning()) {
+        while (!ServerLifeCycleManager.isStopped()) {
             try {
                 List<Alert> alerts = alertDao.listPendingAlerts();
                 AlertServerMetrics.registerPendingAlertGauge(alerts::size);
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
index 172b02b091..7c5c2288ba 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
@@ -18,7 +18,7 @@
 package org.apache.dolphinscheduler.alert;
 
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.dao.PluginDao;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
@@ -40,6 +40,7 @@ import org.springframework.context.event.EventListener;
 @SpringBootApplication
 @ComponentScan("org.apache.dolphinscheduler")
 public class AlertServer implements Closeable {
+
     private static final Logger logger = LoggerFactory.getLogger(AlertServer.class);
 
     private final PluginDao pluginDao;
@@ -94,7 +95,7 @@ public class AlertServer implements Closeable {
         try {
             // set stop signal is true
             // execute only once
-            if (!Stopper.stop()) {
+            if (!ServerLifeCycleManager.toStopped()) {
                 logger.warn("AlterServer is already stopped");
                 return;
             }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index e1c3f064b8..ea8e7e6eae 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -17,28 +17,14 @@
 
 package org.apache.dolphinscheduler.api.service.impl;
 
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_MOVE;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_DELETE;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_LIST;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_BATCH_COPY;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_CREATE;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_DELETE;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_EXPORT;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_EXPORT;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_IMPORT;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_ONLINE_OFFLINE;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
-import static org.apache.dolphinscheduler.common.Constants.COPY_SUFFIX;
-import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import static org.apache.dolphinscheduler.common.Constants.EMPTY_STRING;
-import static org.apache.dolphinscheduler.common.Constants.IMPORT_SUFFIX;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.COMPLEX_TASK_TYPES;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SQL;
-
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.dolphinscheduler.api.dto.DagDataSchedule;
 import org.apache.dolphinscheduler.api.dto.ScheduleParam;
 import org.apache.dolphinscheduler.api.dto.treeview.Instance;
@@ -65,9 +51,9 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.graph.DAG;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
-import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
 import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
@@ -106,10 +92,16 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.web.multipart.MultipartFile;
 
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletResponse;
 import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -133,23 +125,27 @@ import java.util.stream.Collectors;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
 
-import javax.servlet.ServletOutputStream;
-import javax.servlet.http.HttpServletResponse;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.MediaType;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.web.multipart.MultipartFile;
-
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_MOVE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_DELETE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_LIST;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_BATCH_COPY;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_CREATE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_DELETE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_EXPORT;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_EXPORT;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_IMPORT;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_ONLINE_OFFLINE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+import static org.apache.dolphinscheduler.common.Constants.COPY_SUFFIX;
+import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.EMPTY_STRING;
+import static org.apache.dolphinscheduler.common.Constants.IMPORT_SUFFIX;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.COMPLEX_TASK_TYPES;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SQL;
 
 /**
  * process definition service impl
@@ -245,8 +241,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                                                        String otherParamsJson,
                                                        ProcessExecutionTypeEnum executionType) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_CREATE);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_CREATE);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -265,8 +262,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         if (checkTaskDefinitions.get(Constants.STATUS) != Status.SUCCESS) {
             return checkTaskDefinitions;
         }
-        List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
-        Map<String, Object> checkRelationJson = checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);
+        List<ProcessTaskRelationLog> taskRelationList =
+                JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
+        Map<String, Object> checkRelationJson =
+                checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);
         if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) {
             return checkRelationJson;
         }
@@ -286,19 +285,21 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
             return result;
         }
-        ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description,
-            globalParams, locations, timeout, loginUser.getId(), tenantId);
+        ProcessDefinition processDefinition =
+                new ProcessDefinition(projectCode, name, processDefinitionCode, description,
+                        globalParams, locations, timeout, loginUser.getId(), tenantId);
         processDefinition.setExecutionType(executionType);
 
         return createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs, otherParamsJson);
     }
 
     protected Map<String, Object> createDagDefine(User loginUser,
-                                                List<ProcessTaskRelationLog> taskRelationList,
-                                                ProcessDefinition processDefinition,
-                                                List<TaskDefinitionLog> taskDefinitionLogs, String otherParamsJson) {
+                                                  List<ProcessTaskRelationLog> taskRelationList,
+                                                  ProcessDefinition processDefinition,
+                                                  List<TaskDefinitionLog> taskDefinitionLogs, String otherParamsJson) {
         Map<String, Object> result = new HashMap<>();
-        int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs, Boolean.TRUE);
+        int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(),
+                taskDefinitionLogs, Boolean.TRUE);
         if (saveTaskResult == Constants.EXIT_CODE_SUCCESS) {
             logger.info("The task has not changed, so skip");
         }
@@ -311,8 +312,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
             throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
         }
-        int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(),
-            insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
+        int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
+                processDefinition.getCode(),
+                insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
         if (insertResult == Constants.EXIT_CODE_SUCCESS) {
             putMsg(result, Status.SUCCESS);
             result.put(Constants.DATA_LIST, processDefinition);
@@ -324,7 +326,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         return result;
     }
 
-    private Map<String, Object> checkTaskDefinitionList(List<TaskDefinitionLog> taskDefinitionLogs, String taskDefinitionJson) {
+    private Map<String, Object> checkTaskDefinitionList(List<TaskDefinitionLog> taskDefinitionLogs,
+                                                        String taskDefinitionJson) {
         Map<String, Object> result = new HashMap<>();
         try {
             if (taskDefinitionLogs.isEmpty()) {
@@ -352,7 +355,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         return result;
     }
 
-    private Map<String, Object> checkTaskRelationList(List<ProcessTaskRelationLog> taskRelationList, String taskRelationJson, List<TaskDefinitionLog> taskDefinitionLogs) {
+    private Map<String, Object> checkTaskRelationList(List<ProcessTaskRelationLog> taskRelationList,
+                                                      String taskRelationJson,
+                                                      List<TaskDefinitionLog> taskDefinitionLogs) {
         Map<String, Object> result = new HashMap<>();
         try {
             if (taskRelationList == null || taskRelationList.isEmpty()) {
@@ -361,16 +366,19 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 return result;
             }
             List<ProcessTaskRelation> processTaskRelations = taskRelationList.stream()
-                .map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class))
-                .collect(Collectors.toList());
+                    .map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog),
+                            ProcessTaskRelation.class))
+                    .collect(Collectors.toList());
             List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
             if (taskNodeList.size() != taskRelationList.size()) {
-                Set<Long> postTaskCodes = taskRelationList.stream().map(ProcessTaskRelationLog::getPostTaskCode).collect(Collectors.toSet());
+                Set<Long> postTaskCodes = taskRelationList.stream().map(ProcessTaskRelationLog::getPostTaskCode)
+                        .collect(Collectors.toSet());
                 Set<Long> taskNodeCodes = taskNodeList.stream().map(TaskNode::getCode).collect(Collectors.toSet());
                 Collection<Long> codes = CollectionUtils.subtract(postTaskCodes, taskNodeCodes);
                 if (CollectionUtils.isNotEmpty(codes)) {
                     logger.error("the task code is not exist");
-                    putMsg(result, Status.TASK_DEFINE_NOT_EXIST, org.apache.commons.lang.StringUtils.join(codes, Constants.COMMA));
+                    putMsg(result, Status.TASK_DEFINE_NOT_EXIST,
+                            org.apache.commons.lang.StringUtils.join(codes, Constants.COMMA));
                     return result;
                 }
             }
@@ -406,8 +414,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     @Override
     public Map<String, Object> queryProcessDefinitionList(User loginUser, long projectCode) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -428,8 +437,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     @Override
     public Map<String, Object> queryProcessDefinitionSimpleList(User loginUser, long projectCode) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -460,11 +470,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      * @return process definition page
      */
     @Override
-    public Result queryProcessDefinitionListPaging(User loginUser, long projectCode, String searchVal, String otherParamsJson, Integer userId, Integer pageNo, Integer pageSize) {
+    public Result queryProcessDefinitionListPaging(User loginUser, long projectCode, String searchVal,
+                                                   String otherParamsJson, Integer userId, Integer pageNo,
+                                                   Integer pageSize) {
         Result result = new Result();
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION);
+        // check user access for project
+        Map<String, Object> checkResult =
+                projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION);
         Status resultStatus = (Status) checkResult.get(Constants.STATUS);
         if (resultStatus != Status.SUCCESS) {
             putMsg(result, resultStatus);
@@ -473,11 +486,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
 
         Page<ProcessDefinition> page = new Page<>(pageNo, pageSize);
         IPage<ProcessDefinition> processDefinitionIPage = processDefinitionMapper.queryDefineListPaging(
-            page, searchVal, userId, project.getCode(), isAdmin(loginUser));
+                page, searchVal, userId, project.getCode(), isAdmin(loginUser));
 
         List<ProcessDefinition> records = processDefinitionIPage.getRecords();
         for (ProcessDefinition pd : records) {
-            ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper.queryByDefinitionCodeAndVersion(pd.getCode(), pd.getVersion());
+            ProcessDefinitionLog processDefinitionLog =
+                    processDefinitionLogMapper.queryByDefinitionCodeAndVersion(pd.getCode(), pd.getVersion());
             User user = userMapper.selectById(processDefinitionLog.getOperator());
             pd.setModifyBy(user.getUserName());
         }
@@ -502,8 +516,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     @Override
     public Map<String, Object> queryProcessDefinitionByCode(User loginUser, long projectCode, long code) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -526,8 +541,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     @Override
     public Map<String, Object> queryProcessDefinitionByName(User loginUser, long projectCode, String name) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -576,8 +592,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                                                        String otherParamsJson,
                                                        ProcessExecutionTypeEnum executionType) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_UPDATE);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_UPDATE);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -590,8 +607,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         if (checkTaskDefinitions.get(Constants.STATUS) != Status.SUCCESS) {
             return checkTaskDefinitions;
         }
-        List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
-        Map<String, Object> checkRelationJson = checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);
+        List<ProcessTaskRelationLog> taskRelationList =
+                JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
+        Map<String, Object> checkRelationJson =
+                checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);
         if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) {
             return checkRelationJson;
         }
@@ -625,10 +644,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 return result;
             }
         }
-        ProcessDefinition processDefinitionDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class);
+        ProcessDefinition processDefinitionDeepCopy =
+                JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class);
         processDefinition.set(projectCode, name, description, globalParams, locations, timeout, tenantId);
         processDefinition.setExecutionType(executionType);
-        return updateDagDefine(loginUser, taskRelationList, processDefinition, processDefinitionDeepCopy, taskDefinitionLogs, otherParamsJson);
+        return updateDagDefine(loginUser, taskRelationList, processDefinition, processDefinitionDeepCopy,
+                taskDefinitionLogs, otherParamsJson);
     }
 
     /**
@@ -639,15 +660,20 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      * @param processDefinition ProcessDefinition you change task definition and task relation
      * @param taskRelationList All the latest task relation list from process definition
      */
-    private void taskUsedInOtherTaskValid(ProcessDefinition processDefinition, List<ProcessTaskRelationLog> taskRelationList) {
-        List<ProcessTaskRelation> oldProcessTaskRelationList = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
-        Set<ProcessTaskRelationLog> oldProcessTaskRelationSet = oldProcessTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toSet());
+    private void taskUsedInOtherTaskValid(ProcessDefinition processDefinition,
+                                          List<ProcessTaskRelationLog> taskRelationList) {
+        List<ProcessTaskRelation> oldProcessTaskRelationList = processTaskRelationMapper
+                .queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
+        Set<ProcessTaskRelationLog> oldProcessTaskRelationSet =
+                oldProcessTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toSet());
         StringBuilder sb = new StringBuilder();
-        for (ProcessTaskRelationLog oldProcessTaskRelation: oldProcessTaskRelationSet) {
-            boolean oldTaskExists = taskRelationList.stream().anyMatch(relation -> oldProcessTaskRelation.getPostTaskCode() == relation.getPostTaskCode());
+        for (ProcessTaskRelationLog oldProcessTaskRelation : oldProcessTaskRelationSet) {
+            boolean oldTaskExists = taskRelationList.stream()
+                    .anyMatch(relation -> oldProcessTaskRelation.getPostTaskCode() == relation.getPostTaskCode());
             if (!oldTaskExists) {
                 Optional<String> taskDepMsg = workFlowLineageService.taskDepOnTaskMsg(
-                        processDefinition.getProjectCode(), oldProcessTaskRelation.getProcessDefinitionCode(), oldProcessTaskRelation.getPostTaskCode());
+                        processDefinition.getProjectCode(), oldProcessTaskRelation.getProcessDefinitionCode(),
+                        oldProcessTaskRelation.getPostTaskCode());
                 taskDepMsg.ifPresent(sb::append);
             }
             if (sb.length() != 0) {
@@ -657,13 +683,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     }
 
     protected Map<String, Object> updateDagDefine(User loginUser,
-                                                List<ProcessTaskRelationLog> taskRelationList,
-                                                ProcessDefinition processDefinition,
-                                                ProcessDefinition processDefinitionDeepCopy,
-                                                List<TaskDefinitionLog> taskDefinitionLogs,
-                                                String otherParamsJson) {
+                                                  List<ProcessTaskRelationLog> taskRelationList,
+                                                  ProcessDefinition processDefinition,
+                                                  ProcessDefinition processDefinitionDeepCopy,
+                                                  List<TaskDefinitionLog> taskDefinitionLogs,
+                                                  String otherParamsJson) {
         Map<String, Object> result = new HashMap<>();
-        int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs, Boolean.TRUE);
+        int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(),
+                taskDefinitionLogs, Boolean.TRUE);
         if (saveTaskResult == Constants.EXIT_CODE_SUCCESS) {
             logger.info("The task has not changed, so skip");
         }
@@ -673,10 +700,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         }
         boolean isChange = false;
         if (processDefinition.equals(processDefinitionDeepCopy) && saveTaskResult == Constants.EXIT_CODE_SUCCESS) {
-            List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
+            List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper
+                    .queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
             if (taskRelationList.size() == processTaskRelationLogList.size()) {
                 Set<ProcessTaskRelationLog> taskRelationSet = taskRelationList.stream().collect(Collectors.toSet());
-                Set<ProcessTaskRelationLog> processTaskRelationLogSet = processTaskRelationLogList.stream().collect(Collectors.toSet());
+                Set<ProcessTaskRelationLog> processTaskRelationLogSet =
+                        processTaskRelationLogList.stream().collect(Collectors.toSet());
                 if (taskRelationSet.size() == processTaskRelationLogSet.size()) {
                     taskRelationSet.removeAll(processTaskRelationLogSet);
                     if (!taskRelationSet.isEmpty()) {
@@ -693,7 +722,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         }
         if (isChange) {
             processDefinition.setUpdateTime(new Date());
-            int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
+            int insertVersion =
+                    processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
             if (insertVersion <= 0) {
                 putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
                 throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
@@ -701,7 +731,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
 
             taskUsedInOtherTaskValid(processDefinition, taskRelationList);
             int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
-                processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
+                    processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
             if (insertResult == Constants.EXIT_CODE_SUCCESS) {
                 putMsg(result, Status.SUCCESS);
                 result.put(Constants.DATA_LIST, processDefinition);
@@ -728,12 +758,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     @Override
     public Map<String, Object> verifyProcessDefinitionName(User loginUser, long projectCode, String name) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_CREATE);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_CREATE);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
-        ProcessDefinition processDefinition = processDefinitionMapper.verifyByDefineName(project.getCode(), name.trim());
+        ProcessDefinition processDefinition =
+                processDefinitionMapper.verifyByDefineName(project.getCode(), name.trim());
         if (processDefinition == null) {
             putMsg(result, Status.SUCCESS);
         } else {
@@ -756,16 +788,19 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         }
 
         // check process instances is already running
-        List<ProcessInstance> processInstances = processInstanceService.queryByProcessDefineCodeAndStatus(processDefinition.getCode(), Constants.NOT_TERMINATED_STATES);
+        List<ProcessInstance> processInstances = processInstanceService
+                .queryByProcessDefineCodeAndStatus(processDefinition.getCode(), Constants.NOT_TERMINATED_STATES);
         if (CollectionUtils.isNotEmpty(processInstances)) {
             throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_EXECUTING_FAIL, processInstances.size());
         }
 
         // check process used by other task, including subprocess and dependent task type
-        Set<TaskMainInfo> taskDepOnProcess = workFlowLineageService.queryTaskDepOnProcess(processDefinition.getProjectCode(), processDefinition.getCode());
+        Set<TaskMainInfo> taskDepOnProcess = workFlowLineageService
+                .queryTaskDepOnProcess(processDefinition.getProjectCode(), processDefinition.getCode());
         if (CollectionUtils.isNotEmpty(taskDepOnProcess)) {
             String taskDepDetail = taskDepOnProcess.stream()
-                    .map(task -> String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(), task.getTaskName()))
+                    .map(task -> String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(),
+                            task.getTaskName()))
                     .collect(Collectors.joining(Constants.COMMA));
             throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL, taskDepDetail);
         }
@@ -783,8 +818,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     @Transactional
     public Map<String, Object> deleteProcessDefinitionByCode(User loginUser, long projectCode, long code) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION_DELETE);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION_DELETE);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -842,10 +878,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      */
     @Override
     @Transactional
-    public Map<String, Object> releaseProcessDefinition(User loginUser, long projectCode, long code, ReleaseState releaseState) {
+    public Map<String, Object> releaseProcessDefinition(User loginUser, long projectCode, long code,
+                                                        ReleaseState releaseState) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_ONLINE_OFFLINE);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_ONLINE_OFFLINE);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -863,7 +901,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         }
         switch (releaseState) {
             case ONLINE:
-                List<ProcessTaskRelation> relationList = processService.findRelationByCode(code, processDefinition.getVersion());
+                List<ProcessTaskRelation> relationList =
+                        processService.findRelationByCode(code, processDefinition.getVersion());
                 if (CollectionUtils.isEmpty(relationList)) {
                     putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
                     return result;
@@ -876,7 +915,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 int updateProcess = processDefinitionMapper.updateById(processDefinition);
                 Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(code);
                 if (updateProcess > 0 && schedule != null) {
-                    logger.info("set schedule offline, project code: {}, schedule id: {}, process definition code: {}", projectCode, schedule.getId(), code);
+                    logger.info("set schedule offline, project code: {}, schedule id: {}, process definition code: {}",
+                            projectCode, schedule.getId(), code);
                     // set status
                     schedule.setReleaseState(releaseState);
                     int updateSchedule = scheduleMapper.updateById(schedule);
@@ -900,24 +940,29 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      * batch export process definition by codes
      */
     @Override
-    public void batchExportProcessDefinitionByCodes(User loginUser, long projectCode, String codes, HttpServletResponse response) {
+    public void batchExportProcessDefinitionByCodes(User loginUser, long projectCode, String codes,
+                                                    HttpServletResponse response) {
         if (org.apache.commons.lang.StringUtils.isEmpty(codes)) {
             return;
         }
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION_EXPORT);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION_EXPORT);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return;
         }
-        Set<Long> defineCodeSet = Lists.newArrayList(codes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
+        Set<Long> defineCodeSet = Lists.newArrayList(codes.split(Constants.COMMA)).stream().map(Long::parseLong)
+                .collect(Collectors.toSet());
         List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryByCodes(defineCodeSet);
         if (CollectionUtils.isEmpty(processDefinitionList)) {
             return;
         }
         // check processDefinition exist in project
-        List<ProcessDefinition> processDefinitionListInProject = processDefinitionList.stream().filter(o -> projectCode == o.getProjectCode()).collect(Collectors.toList());
-        List<DagDataSchedule> dagDataSchedules = processDefinitionListInProject.stream().map(this::exportProcessDagData).collect(Collectors.toList());
+        List<ProcessDefinition> processDefinitionListInProject = processDefinitionList.stream()
+                .filter(o -> projectCode == o.getProjectCode()).collect(Collectors.toList());
+        List<DagDataSchedule> dagDataSchedules =
+                processDefinitionListInProject.stream().map(this::exportProcessDagData).collect(Collectors.toList());
         if (CollectionUtils.isNotEmpty(dagDataSchedules)) {
             downloadProcessDefinitionFile(response, dagDataSchedules);
         }
@@ -987,11 +1032,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         String dagDataScheduleJson = FileUtils.file2String(file);
         List<DagDataSchedule> dagDataScheduleList = JSONUtils.toList(dagDataScheduleJson, DagDataSchedule.class);
         Project project = projectMapper.queryByCode(projectCode);
-        result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_EXPORT);
+        result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_EXPORT);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
-        //check file content
+        // check file content
         if (CollectionUtils.isEmpty(dagDataScheduleList)) {
             putMsg(result, Status.DATA_IS_NULL, "fileContent");
             return result;
@@ -1009,7 +1054,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     public Map<String, Object> importSqlProcessDefinition(User loginUser, long projectCode, MultipartFile file) {
         Map<String, Object> result = new HashMap<>();
         Project project = projectMapper.queryByCode(projectCode);
-        result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_IMPORT);
+        result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_IMPORT);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -1034,15 +1079,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         Map<String, DataSource> dataSourceCache = new HashMap<>(1);
         Map<String, Long> taskNameToCode = new HashMap<>(16);
         Map<String, List<String>> taskNameToUpstream = new HashMap<>(16);
-        try (ZipInputStream zIn = new ZipInputStream(file.getInputStream());
-             BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(zIn))) {
+        try (
+                ZipInputStream zIn = new ZipInputStream(file.getInputStream());
+                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(zIn))) {
             // build process definition
             processDefinition = new ProcessDefinition(projectCode,
-                processDefinitionName,
-                CodeGenerateUtils.getInstance().genCode(),
-                "",
-                "[]", null,
-                0, loginUser.getId(), loginUser.getTenantId());
+                    processDefinitionName,
+                    CodeGenerateUtils.getInstance().genCode(),
+                    "",
+                    "[]", null,
+                    0, loginUser.getId(), loginUser.getTenantId());
 
             ZipEntry entry;
             while ((entry = zIn.getNextEntry()) != null) {
@@ -1060,7 +1106,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                         totalSizeArchive += nBytes;
                         long compressionRatio = totalSizeEntry / entry.getCompressedSize();
                         if (compressionRatio > THRESHOLD_RATIO) {
-                            throw new IllegalStateException("ratio between compressed and uncompressed data is highly suspicious, looks like a Zip Bomb Attack");
+                            throw new IllegalStateException(
+                                    "ratio between compressed and uncompressed data is highly suspicious, looks like a Zip Bomb Attack");
                         }
                         int commentIndex = line.indexOf("-- ");
                         if (commentIndex >= 0) {
@@ -1075,7 +1122,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                                         break;
                                     case "upstream":
                                         upstreams = Arrays.stream(value.split(",")).map(String::trim)
-                                            .filter(s -> !"".equals(s)).collect(Collectors.toList());
+                                                .filter(s -> !"".equals(s)).collect(Collectors.toList());
                                         line = line.substring(0, commentIndex);
                                         break;
                                     case "datasource":
@@ -1113,7 +1160,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                     }
                     dataSourceCache.put(datasourceName, dataSource);
 
-                    TaskDefinitionLog taskDefinition = buildNormalSqlTaskDefinition(taskName, dataSource, sql.substring(0, sql.length() - 1));
+                    TaskDefinitionLog taskDefinition =
+                            buildNormalSqlTaskDefinition(taskName, dataSource, sql.substring(0, sql.length() - 1));
 
                     taskDefinitionList.add(taskDefinition);
                     taskNameToCode.put(taskDefinition.getName(), taskDefinition.getCode());
@@ -1121,11 +1169,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 }
 
                 if (totalSizeArchive > THRESHOLD_SIZE) {
-                    throw new IllegalStateException("the uncompressed data size is too much for the application resource capacity");
+                    throw new IllegalStateException(
+                            "the uncompressed data size is too much for the application resource capacity");
                 }
 
                 if (totalEntryArchive > THRESHOLD_ENTRIES) {
-                    throw new IllegalStateException("too much entries in this archive, can lead to inodes exhaustion of the system");
+                    throw new IllegalStateException(
+                            "too much entries in this archive, can lead to inodes exhaustion of the system");
                 }
             }
         } catch (Exception e) {
@@ -1138,13 +1188,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         for (Map.Entry<String, Long> entry : taskNameToCode.entrySet()) {
             List<String> upstreams = taskNameToUpstream.get(entry.getKey());
             if (CollectionUtils.isEmpty(upstreams)
-                || (upstreams.size() == 1 && upstreams.contains("root") && !taskNameToCode.containsKey("root"))) {
+                    || (upstreams.size() == 1 && upstreams.contains("root") && !taskNameToCode.containsKey("root"))) {
                 ProcessTaskRelationLog processTaskRelation = buildNormalTaskRelation(0, entry.getValue());
                 processTaskRelationList.add(processTaskRelation);
                 continue;
             }
             for (String upstream : upstreams) {
-                ProcessTaskRelationLog processTaskRelation = buildNormalTaskRelation(taskNameToCode.get(upstream), entry.getValue());
+                ProcessTaskRelationLog processTaskRelation =
+                        buildNormalTaskRelation(taskNameToCode.get(upstream), entry.getValue());
                 processTaskRelationList.add(processTaskRelation);
             }
         }
@@ -1165,7 +1216,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
 
     private DataSource queryDatasourceByNameAndUser(String datasourceName, User loginUser) {
         if (isAdmin(loginUser)) {
-            List<DataSource> dataSources  = dataSourceMapper.queryDataSourceByName(datasourceName);
+            List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(datasourceName);
             if (CollectionUtils.isNotEmpty(dataSources)) {
                 return dataSources.get(0);
             }
@@ -1175,7 +1226,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         return null;
     }
 
-    private TaskDefinitionLog buildNormalSqlTaskDefinition(String taskName, DataSource dataSource, String sql) throws CodeGenerateException {
+    private TaskDefinitionLog buildNormalSqlTaskDefinition(String taskName, DataSource dataSource,
+                                                           String sql) throws CodeGenerateException {
         TaskDefinitionLog taskDefinition = new TaskDefinitionLog();
         taskDefinition.setName(taskName);
         taskDefinition.setFlag(Flag.YES);
@@ -1206,7 +1258,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * check and import
      */
-    protected boolean checkAndImport(User loginUser, long projectCode, Map<String, Object> result, DagDataSchedule dagDataSchedule, String otherParamsJson) {
+    protected boolean checkAndImport(User loginUser, long projectCode, Map<String, Object> result,
+                                     DagDataSchedule dagDataSchedule, String otherParamsJson) {
         if (!checkImportanceParams(dagDataSchedule, result)) {
             return false;
         }
@@ -1215,8 +1268,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         // generate import processDefinitionName
         String processDefinitionName = recursionProcessDefinitionName(projectCode, processDefinition.getName(), 1);
         String importProcessDefinitionName = getNewName(processDefinitionName, IMPORT_SUFFIX);
-        //unique check
-        Map<String, Object> checkResult = verifyProcessDefinitionName(loginUser, projectCode, importProcessDefinitionName);
+        // unique check
+        Map<String, Object> checkResult =
+                verifyProcessDefinitionName(loginUser, projectCode, importProcessDefinitionName);
         if (Status.SUCCESS.equals(checkResult.get(Constants.STATUS))) {
             putMsg(result, Status.SUCCESS);
         } else {
@@ -1279,7 +1333,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
             taskRelationLogList.add(processTaskRelationLog);
         }
-        if (StringUtils.isNotEmpty(processDefinition.getLocations()) && JSONUtils.checkJsonValid(processDefinition.getLocations())) {
+        if (StringUtils.isNotEmpty(processDefinition.getLocations())
+                && JSONUtils.checkJsonValid(processDefinition.getLocations())) {
             ArrayNode arrayNode = JSONUtils.parseArray(processDefinition.getLocations());
             ArrayNode newArrayNode = JSONUtils.createArrayNode();
             for (int i = 0; i < arrayNode.size(); i++) {
@@ -1296,7 +1351,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         }
         processDefinition.setCreateTime(new Date());
         processDefinition.setUpdateTime(new Date());
-        Map<String, Object> createDagResult = createDagDefine(loginUser, taskRelationLogList, processDefinition, Lists.newArrayList(), otherParamsJson);
+        Map<String, Object> createDagResult = createDagDefine(loginUser, taskRelationLogList, processDefinition,
+                Lists.newArrayList(), otherParamsJson);
         if (Status.SUCCESS.equals(createDagResult.get(Constants.STATUS))) {
             putMsg(createDagResult, Status.SUCCESS);
         } else {
@@ -1340,7 +1396,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     }
 
     private String recursionProcessDefinitionName(long projectCode, String processDefinitionName, int num) {
-        ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName);
+        ProcessDefinition processDefinition =
+                processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName);
         if (processDefinition != null) {
             if (num > 1) {
                 String str = processDefinitionName.substring(0, processDefinitionName.length() - 3);
@@ -1361,7 +1418,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      * @return check result code
      */
     @Override
-    public Map<String, Object> checkProcessNodeList(String processTaskRelationJson, List<TaskDefinitionLog> taskDefinitionLogsList) {
+    public Map<String, Object> checkProcessNodeList(String processTaskRelationJson,
+                                                    List<TaskDefinitionLog> taskDefinitionLogsList) {
         Map<String, Object> result = new HashMap<>();
         try {
             if (processTaskRelationJson == null) {
@@ -1370,7 +1428,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 return result;
             }
 
-            List<ProcessTaskRelation> taskRelationList = JSONUtils.toList(processTaskRelationJson, ProcessTaskRelation.class);
+            List<ProcessTaskRelation> taskRelationList =
+                    JSONUtils.toList(processTaskRelationJson, ProcessTaskRelation.class);
             // Check whether the task node is normal
             List<TaskNode> taskNodes = processService.transformTask(taskRelationList, taskDefinitionLogsList);
 
@@ -1423,8 +1482,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     @Override
     public Map<String, Object> getTaskNodeListByDefinitionCode(User loginUser, long projectCode, long code) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,null);
+        // check user access for project
+        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, null);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -1452,26 +1511,27 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     @Override
     public Map<String, Object> getNodeListMapByDefinitionCodes(User loginUser, long projectCode, String codes) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,null);
+        // check user access for project
+        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, null);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
 
-        Set<Long> defineCodeSet = Lists.newArrayList(codes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
+        Set<Long> defineCodeSet = Lists.newArrayList(codes.split(Constants.COMMA)).stream().map(Long::parseLong)
+                .collect(Collectors.toSet());
         List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryByCodes(defineCodeSet);
         if (CollectionUtils.isEmpty(processDefinitionList)) {
             logger.info("process definition not exists");
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes);
             return result;
         }
-        HashMap<Long, Project> userProjects =  new HashMap<>(Constants.DEFAULT_HASH_MAP_SIZE);
+        HashMap<Long, Project> userProjects = new HashMap<>(Constants.DEFAULT_HASH_MAP_SIZE);
         projectMapper.queryProjectCreatedAndAuthorizedByUserId(loginUser.getId())
-            .forEach(userProject -> userProjects.put(userProject.getCode(), userProject));
+                .forEach(userProject -> userProjects.put(userProject.getCode(), userProject));
 
         // check processDefinition exist in project
         List<ProcessDefinition> processDefinitionListInProject = processDefinitionList.stream()
-            .filter(o -> userProjects.containsKey(o.getProjectCode())).collect(Collectors.toList());
+                .filter(o -> userProjects.containsKey(o.getProjectCode())).collect(Collectors.toList());
         if (CollectionUtils.isEmpty(processDefinitionListInProject)) {
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes);
             return result;
@@ -1499,13 +1559,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     @Override
     public Map<String, Object> queryAllProcessDefinitionByProjectCode(User loginUser, long projectCode) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
         List<ProcessDefinition> processDefinitions = processDefinitionMapper.queryAllDefinitionList(projectCode);
-        List<DagData> dagDataList = processDefinitions.stream().map(processService::genDagData).collect(Collectors.toList());
+        List<DagData> dagDataList =
+                processDefinitions.stream().map(processService::genDagData).collect(Collectors.toList());
         result.put(Constants.DATA_LIST, dagDataList);
         putMsg(result, Status.SUCCESS);
         return result;
@@ -1520,7 +1582,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     @Override
     public Map<String, Object> queryProcessDefinitionListByProjectCode(long projectCode) {
         Map<String, Object> result = new HashMap<>();
-        List<DependentSimplifyDefinition> processDefinitions = processDefinitionMapper.queryDefinitionListByProjectCodeAndProcessDefinitionCodes(projectCode, null);
+        List<DependentSimplifyDefinition> processDefinitions =
+                processDefinitionMapper.queryDefinitionListByProjectCodeAndProcessDefinitionCodes(projectCode, null);
         result.put(Constants.DATA_LIST, processDefinitions);
         putMsg(result, Status.SUCCESS);
         return result;
@@ -1534,19 +1597,22 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      * @return task definition list in the process definition
      */
     @Override
-    public Map<String, Object> queryTaskDefinitionListByProcessDefinitionCode(long projectCode, Long processDefinitionCode) {
+    public Map<String, Object> queryTaskDefinitionListByProcessDefinitionCode(long projectCode,
+                                                                              Long processDefinitionCode) {
         Map<String, Object> result = new HashMap<>();
 
         Set<Long> definitionCodesSet = new HashSet<>();
         definitionCodesSet.add(processDefinitionCode);
-        List<DependentSimplifyDefinition> processDefinitions = processDefinitionMapper.queryDefinitionListByProjectCodeAndProcessDefinitionCodes(projectCode, definitionCodesSet);
+        List<DependentSimplifyDefinition> processDefinitions = processDefinitionMapper
+                .queryDefinitionListByProjectCodeAndProcessDefinitionCodes(projectCode, definitionCodesSet);
 
-        //query process task relation
-        List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryProcessTaskRelationsByProcessDefinitionCode(
-                processDefinitions.get(0).getCode(),
-                processDefinitions.get(0).getVersion());
+        // query process task relation
+        List<ProcessTaskRelation> processTaskRelations =
+                processTaskRelationMapper.queryProcessTaskRelationsByProcessDefinitionCode(
+                        processDefinitions.get(0).getCode(),
+                        processDefinitions.get(0).getVersion());
 
-        //query task definition log
+        // query task definition log
         List<TaskDefinitionLog> taskDefinitionLogsList = processService.genTaskDefineList(processTaskRelations);
 
         List<DependentSimplifyDefinition> taskDefinitionList = new ArrayList<>();
@@ -1572,11 +1638,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      * @return tree view json data
      */
     @Override
-    public Map<String, Object> viewTree(User loginUser,long projectCode, long code, Integer limit) {
+    public Map<String, Object> viewTree(User loginUser, long projectCode, long code, Integer limit) {
         Map<String, Object> result = new HashMap<>();
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_TREE_VIEW);
+        // check user access for project
+        result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_TREE_VIEW);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -1590,15 +1656,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         // nodes that is running
         Map<String, List<TreeViewDto>> runningNodeMap = new ConcurrentHashMap<>();
 
-        //nodes that is waiting to run
+        // nodes that is waiting to run
         Map<String, List<TreeViewDto>> waitingRunningNodeMap = new ConcurrentHashMap<>();
 
         // List of process instances
         List<ProcessInstance> processInstanceList = processInstanceService.queryByProcessDefineCode(code, limit);
-        processInstanceList.forEach(processInstance -> processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime())));
-        List<TaskDefinitionLog> taskDefinitionList = processService.genTaskDefineList(processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()));
+        processInstanceList.forEach(processInstance -> processInstance
+                .setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime())));
+        List<TaskDefinitionLog> taskDefinitionList = processService.genTaskDefineList(processTaskRelationMapper
+                .queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()));
         Map<Long, TaskDefinitionLog> taskDefinitionMap = taskDefinitionList.stream()
-            .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
+                .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
 
         if (limit > processInstanceList.size()) {
             limit = processInstanceList.size();
@@ -1612,9 +1680,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         for (int i = limit - 1; i >= 0; i--) {
             ProcessInstance processInstance = processInstanceList.get(i);
             Date endTime = processInstance.getEndTime() == null ? new Date() : processInstance.getEndTime();
-            parentTreeViewDto.getInstances().add(new Instance(processInstance.getId(), processInstance.getName(), processInstance.getProcessDefinitionCode(),
-                "", processInstance.getState().toString(), processInstance.getStartTime(), endTime, processInstance.getHost(),
-                DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime())));
+            parentTreeViewDto.getInstances()
+                    .add(new Instance(processInstance.getId(), processInstance.getName(),
+                            processInstance.getProcessDefinitionCode(),
+                            "", processInstance.getState().toString(), processInstance.getStartTime(), endTime,
+                            processInstance.getHost(),
+                            DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime())));
         }
 
         List<TreeViewDto> parentTreeViewDtoList = new ArrayList<>();
@@ -1624,7 +1695,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             runningNodeMap.put(startNode, parentTreeViewDtoList);
         }
 
-        while (Stopper.isRunning()) {
+        while (!ServerLifeCycleManager.isStopped()) {
             Set<String> postNodeList;
             Iterator<Map.Entry<String, List<TreeViewDto>>> iter = runningNodeMap.entrySet().iterator();
             while (iter.hasNext()) {
@@ -1637,10 +1708,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 treeViewDto.setType(taskNode.getType());
                 treeViewDto.setCode(taskNode.getCode());
                 treeViewDto.setName(taskNode.getName());
-                //set treeViewDto instances
+                // set treeViewDto instances
                 for (int i = limit - 1; i >= 0; i--) {
                     ProcessInstance processInstance = processInstanceList.get(i);
-                    TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndCode(processInstance.getId(), Long.parseLong(nodeCode));
+                    TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndCode(processInstance.getId(),
+                            Long.parseLong(nodeCode));
                     if (taskInstance == null) {
                         treeViewDto.getInstances().add(new Instance(-1, "not running", 0, "null"));
                     } else {
@@ -1654,9 +1726,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                             subProcessCode = Long.parseLong(JSONUtils.parseObject(
                                     taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_CODE).asText());
                         }
-                        treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskCode(),
-                                taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(),
-                                taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessCode));
+                        treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(),
+                                taskInstance.getTaskCode(),
+                                taskInstance.getTaskType(), taskInstance.getState().toString(),
+                                taskInstance.getStartTime(), taskInstance.getEndTime(),
+                                taskInstance.getHost(),
+                                DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessCode));
                     }
                 }
                 for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) {
@@ -1728,7 +1803,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                                                           long projectCode,
                                                           String codes,
                                                           long targetProjectCode) {
-        Map<String, Object> result = checkParams(loginUser, projectCode, codes, targetProjectCode,WORKFLOW_BATCH_COPY);
+        Map<String, Object> result = checkParams(loginUser, projectCode, codes, targetProjectCode, WORKFLOW_BATCH_COPY);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -1755,7 +1830,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                                                           long projectCode,
                                                           String codes,
                                                           long targetProjectCode) {
-        Map<String, Object> result = checkParams(loginUser, projectCode, codes, targetProjectCode,TASK_DEFINITION_MOVE);
+        Map<String, Object> result =
+                checkParams(loginUser, projectCode, codes, targetProjectCode, TASK_DEFINITION_MOVE);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -1772,10 +1848,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     private Map<String, Object> checkParams(User loginUser,
                                             long projectCode,
                                             String processDefinitionCodes,
-                                            long targetProjectCode,String perm) {
+                                            long targetProjectCode, String perm) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,perm);
+        // check user access for project
+        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, perm);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -1787,8 +1863,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
 
         if (projectCode != targetProjectCode) {
             Project targetProject = projectMapper.queryByCode(targetProjectCode);
-            //check user access for project
-            Map<String, Object> targetResult = projectService.checkProjectAndAuth(loginUser, targetProject, targetProjectCode,perm);
+            // check user access for project
+            Map<String, Object> targetResult =
+                    projectService.checkProjectAndAuth(loginUser, targetProject, targetProjectCode, perm);
             if (targetResult.get(Constants.STATUS) != Status.SUCCESS) {
                 return targetResult;
             }
@@ -1797,21 +1874,26 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     }
 
     protected void doBatchOperateProcessDefinition(User loginUser,
-                                                 long targetProjectCode,
-                                                 List<String> failedProcessList,
-                                                 String processDefinitionCodes,
-                                                 Map<String, Object> result,
-                                                 boolean isCopy) {
-        Set<Long> definitionCodes = Arrays.stream(processDefinitionCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
+                                                   long targetProjectCode,
+                                                   List<String> failedProcessList,
+                                                   String processDefinitionCodes,
+                                                   Map<String, Object> result,
+                                                   boolean isCopy) {
+        Set<Long> definitionCodes = Arrays.stream(processDefinitionCodes.split(Constants.COMMA)).map(Long::parseLong)
+                .collect(Collectors.toSet());
         List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryByCodes(definitionCodes);
-        Set<Long> queryCodes = processDefinitionList.stream().map(ProcessDefinition::getCode).collect(Collectors.toSet());
+        Set<Long> queryCodes =
+                processDefinitionList.stream().map(ProcessDefinition::getCode).collect(Collectors.toSet());
         // definitionCodes - queryCodes
-        Set<Long> diffCode = definitionCodes.stream().filter(code -> !queryCodes.contains(code)).collect(Collectors.toSet());
+        Set<Long> diffCode =
+                definitionCodes.stream().filter(code -> !queryCodes.contains(code)).collect(Collectors.toSet());
         diffCode.forEach(code -> failedProcessList.add(code + "[null]"));
         for (ProcessDefinition processDefinition : processDefinitionList) {
             List<ProcessTaskRelation> processTaskRelations =
-                processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
-            List<ProcessTaskRelationLog> taskRelationList = processTaskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
+                    processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(),
+                            processDefinition.getCode());
+            List<ProcessTaskRelationLog> taskRelationList =
+                    processTaskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
             processDefinition.setProjectCode(targetProjectCode);
             String otherParamsJson = doOtherOperateProcess(loginUser, processDefinition);
             if (isCopy) {
@@ -1839,7 +1921,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                         processTaskRelationLog.setPreTaskCode(taskCodeMap.get(processTaskRelationLog.getPreTaskCode()));
                     }
                     if (processTaskRelationLog.getPostTaskCode() > 0) {
-                        processTaskRelationLog.setPostTaskCode(taskCodeMap.get(processTaskRelationLog.getPostTaskCode()));
+                        processTaskRelationLog
+                                .setPostTaskCode(taskCodeMap.get(processTaskRelationLog.getPostTaskCode()));
                     }
                 }
                 final long oldProcessDefinitionCode = processDefinition.getCode();
@@ -1865,7 +1948,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                     }
                     processDefinition.setLocations(JSONUtils.toJsonString(jsonNodes));
                 }
-                //copy timing configuration
+                // copy timing configuration
                 Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(oldProcessDefinitionCode);
                 if (scheduleObj != null) {
                     scheduleObj.setProcessDefinitionCode(processDefinition.getCode());
@@ -1879,14 +1962,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                     }
                 }
                 try {
-                    result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs, otherParamsJson));
+                    result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs,
+                            otherParamsJson));
                 } catch (Exception e) {
                     putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR);
                     throw new ServiceException(Status.COPY_PROCESS_DEFINITION_ERROR);
                 }
             } else {
                 try {
-                    result.putAll(updateDagDefine(loginUser, taskRelationList, processDefinition, null, Lists.newArrayList(), otherParamsJson));
+                    result.putAll(updateDagDefine(loginUser, taskRelationList, processDefinition, null,
+                            Lists.newArrayList(), otherParamsJson));
                 } catch (Exception e) {
                     putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR);
                     throw new ServiceException(Status.MOVE_PROCESS_DEFINITION_ERROR);
@@ -1908,7 +1993,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         StringBuilder newName = new StringBuilder();
         String regex = String.format(".*%s\\d{17}$", suffix);
         if (originalName.matches(regex)) {
-            //replace timestamp of originalName
+            // replace timestamp of originalName
             return newName.append(originalName, 0, originalName.lastIndexOf(suffix))
                     .append(suffix)
                     .append(DateUtils.getCurrentTimeStamp())
@@ -1931,10 +2016,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      */
     @Override
     @Transactional
-    public Map<String, Object> switchProcessDefinitionVersion(User loginUser, long projectCode, long code, int version) {
+    public Map<String, Object> switchProcessDefinitionVersion(User loginUser, long projectCode, long code,
+                                                              int version) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_SWITCH_TO_THIS_VERSION);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_SWITCH_TO_THIS_VERSION);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -1945,9 +2032,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             return result;
         }
 
-        ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version);
+        ProcessDefinitionLog processDefinitionLog =
+                processDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version);
         if (Objects.isNull(processDefinitionLog)) {
-            putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR, processDefinition.getCode(), version);
+            putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR,
+                    processDefinition.getCode(), version);
             return result;
         }
         int switchVersion = processService.switchVersion(processDefinition, processDefinitionLog);
@@ -1972,9 +2061,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                                          Map<String, Object> result, List<String> failedProcessList, boolean isCopy) {
         if (!failedProcessList.isEmpty()) {
             if (isCopy) {
-                putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode, String.join(",", failedProcessList));
+                putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode,
+                        String.join(",", failedProcessList));
             } else {
-                putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode, String.join(",", failedProcessList));
+                putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode,
+                        String.join(",", failedProcessList));
             }
         } else {
             putMsg(result, Status.SUCCESS);
@@ -1992,11 +2083,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      * @return the pagination process definition versions info of the certain process definition
      */
     @Override
-    public Result queryProcessDefinitionVersions(User loginUser, long projectCode, int pageNo, int pageSize, long code) {
+    public Result queryProcessDefinitionVersions(User loginUser, long projectCode, int pageNo, int pageSize,
+                                                 long code) {
         Result result = new Result();
         Project project = projectMapper.queryByCode(projectCode);
         // check user access for project
-        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode,VERSION_LIST);
+        Map<String, Object> checkResult =
+                projectService.checkProjectAndAuth(loginUser, project, projectCode, VERSION_LIST);
         Status resultStatus = (Status) checkResult.get(Constants.STATUS);
         if (resultStatus != Status.SUCCESS) {
             putMsg(result, resultStatus);
@@ -2004,7 +2097,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         }
         PageInfo<ProcessDefinitionLog> pageInfo = new PageInfo<>(pageNo, pageSize);
         Page<ProcessDefinitionLog> page = new Page<>(pageNo, pageSize);
-        IPage<ProcessDefinitionLog> processDefinitionVersionsPaging = processDefinitionLogMapper.queryProcessDefinitionVersionsPaging(page, code, projectCode);
+        IPage<ProcessDefinitionLog> processDefinitionVersionsPaging =
+                processDefinitionLogMapper.queryProcessDefinitionVersionsPaging(page, code, projectCode);
         List<ProcessDefinitionLog> processDefinitionLogs = processDefinitionVersionsPaging.getRecords();
 
         pageInfo.setTotalList(processDefinitionLogs);
@@ -2014,7 +2108,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         return result;
     }
 
-
     /**
      * delete one certain process definition by version number and process definition code
      *
@@ -2026,10 +2119,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      */
     @Override
     @Transactional
-    public Map<String, Object> deleteProcessDefinitionVersion(User loginUser, long projectCode, long code, int version) {
+    public Map<String, Object> deleteProcessDefinitionVersion(User loginUser, long projectCode, long code,
+                                                              int version) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,VERSION_DELETE);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, projectCode, VERSION_DELETE);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -2079,8 +2174,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                                                             String scheduleJson,
                                                             ProcessExecutionTypeEnum executionType) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_CREATE);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_CREATE);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -2111,8 +2207,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
             return result;
         }
-        ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description,
-            globalParams, "", timeout, loginUser.getId(), tenantId);
+        ProcessDefinition processDefinition =
+                new ProcessDefinition(projectCode, name, processDefinitionCode, description,
+                        globalParams, "", timeout, loginUser.getId(), tenantId);
         processDefinition.setExecutionType(executionType);
         result = createEmptyDagDefine(loginUser, processDefinition);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
@@ -2145,7 +2242,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         return result;
     }
 
-    protected Map<String, Object> createDagSchedule(User loginUser, ProcessDefinition processDefinition, String scheduleJson) {
+    protected Map<String, Object> createDagSchedule(User loginUser, ProcessDefinition processDefinition,
+                                                    String scheduleJson) {
         Map<String, Object> result = new HashMap<>();
         Schedule scheduleObj = JSONUtils.parseObject(scheduleJson, Schedule.class);
         if (scheduleObj == null) {
@@ -2164,16 +2262,20 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, scheduleObj.getCrontab());
             return result;
         }
-        scheduleObj.setWarningType(scheduleObj.getWarningType() == null ? WarningType.NONE : scheduleObj.getWarningType());
+        scheduleObj
+                .setWarningType(scheduleObj.getWarningType() == null ? WarningType.NONE : scheduleObj.getWarningType());
         scheduleObj.setWarningGroupId(scheduleObj.getWarningGroupId() == 0 ? 1 : scheduleObj.getWarningGroupId());
-        scheduleObj.setFailureStrategy(scheduleObj.getFailureStrategy() == null ? FailureStrategy.CONTINUE : scheduleObj.getFailureStrategy());
+        scheduleObj.setFailureStrategy(
+                scheduleObj.getFailureStrategy() == null ? FailureStrategy.CONTINUE : scheduleObj.getFailureStrategy());
         scheduleObj.setCreateTime(now);
         scheduleObj.setUpdateTime(now);
         scheduleObj.setUserId(loginUser.getId());
         scheduleObj.setReleaseState(ReleaseState.OFFLINE);
-        scheduleObj.setProcessInstancePriority(scheduleObj.getProcessInstancePriority() == null ? Priority.MEDIUM : scheduleObj.getProcessInstancePriority());
+        scheduleObj.setProcessInstancePriority(scheduleObj.getProcessInstancePriority() == null ? Priority.MEDIUM
+                : scheduleObj.getProcessInstancePriority());
         scheduleObj.setWorkerGroup(scheduleObj.getWorkerGroup() == null ? "default" : scheduleObj.getWorkerGroup());
-        scheduleObj.setEnvironmentCode(scheduleObj.getEnvironmentCode() == null ? -1 : scheduleObj.getEnvironmentCode());
+        scheduleObj
+                .setEnvironmentCode(scheduleObj.getEnvironmentCode() == null ? -1 : scheduleObj.getEnvironmentCode());
         scheduleMapper.insert(scheduleObj);
 
         putMsg(result, Status.SUCCESS);
@@ -2211,8 +2313,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                                                                 String otherParamsJson,
                                                                 ProcessExecutionTypeEnum executionType) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_UPDATE);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_UPDATE);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -2249,11 +2352,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 return result;
             }
         }
-        ProcessDefinition processDefinitionDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class);
+        ProcessDefinition processDefinitionDeepCopy =
+                JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class);
         processDefinition.set(projectCode, name, description, globalParams, "", timeout, tenantId);
         processDefinition.setExecutionType(executionType);
-        List<ProcessTaskRelationLog> taskRelationList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
-        result = updateDagDefine(loginUser, taskRelationList, processDefinition, processDefinitionDeepCopy, Lists.newArrayList(), otherParamsJson);
+        List<ProcessTaskRelationLog> taskRelationList = processTaskRelationLogMapper
+                .queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
+        result = updateDagDefine(loginUser, taskRelationList, processDefinition, processDefinitionDeepCopy,
+                Lists.newArrayList(), otherParamsJson);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -2272,9 +2378,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     }
 
     protected Map<String, Object> updateDagSchedule(User loginUser,
-                                                  long projectCode,
-                                                  long processDefinitionCode,
-                                                  String scheduleJson) {
+                                                    long projectCode,
+                                                    long processDefinitionCode,
+                                                    String scheduleJson) {
         Map<String, Object> result = new HashMap<>();
         Schedule schedule = JSONUtils.parseObject(scheduleJson, Schedule.class);
         if (schedule == null) {
@@ -2282,9 +2388,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
             throw new ServiceException(Status.DATA_IS_NOT_VALID);
         }
         // set default value
-        FailureStrategy failureStrategy = schedule.getFailureStrategy() == null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy();
+        FailureStrategy failureStrategy =
+                schedule.getFailureStrategy() == null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy();
         WarningType warningType = schedule.getWarningType() == null ? WarningType.NONE : schedule.getWarningType();
-        Priority processInstancePriority = schedule.getProcessInstancePriority() == null ? Priority.MEDIUM : schedule.getProcessInstancePriority();
+        Priority processInstancePriority =
+                schedule.getProcessInstancePriority() == null ? Priority.MEDIUM : schedule.getProcessInstancePriority();
         int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 : schedule.getWarningGroupId();
         String workerGroup = schedule.getWorkerGroup() == null ? "default" : schedule.getWorkerGroup();
         long environmentCode = schedule.getEnvironmentCode() == null ? -1 : schedule.getEnvironmentCode();
@@ -2296,16 +2404,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         param.setTimezoneId(schedule.getTimezoneId());
 
         return schedulerService.updateScheduleByProcessDefinitionCode(
-            loginUser,
-            projectCode,
-            processDefinitionCode,
-            JSONUtils.toJsonString(param),
-            warningType,
-            warningGroupId,
-            failureStrategy,
-            processInstancePriority,
-            workerGroup,
-            environmentCode);
+                loginUser,
+                projectCode,
+                processDefinitionCode,
+                JSONUtils.toJsonString(param),
+                warningType,
+                warningGroupId,
+                failureStrategy,
+                processInstancePriority,
+                workerGroup,
+                environmentCode);
     }
 
     /**
@@ -2319,10 +2427,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      */
     @Transactional
     @Override
-    public Map<String, Object> releaseWorkflowAndSchedule(User loginUser, long projectCode, long code, ReleaseState releaseState) {
+    public Map<String, Object> releaseWorkflowAndSchedule(User loginUser, long projectCode, long code,
+                                                          ReleaseState releaseState) {
         Project project = projectMapper.queryByCode(projectCode);
-        //check user access for project
-        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_ONLINE_OFFLINE);
+        // check user access for project
+        Map<String, Object> result =
+                projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_ONLINE_OFFLINE);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
@@ -2344,7 +2454,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
         }
         switch (releaseState) {
             case ONLINE:
-                List<ProcessTaskRelation> relationList = processService.findRelationByCode(code, processDefinition.getVersion());
+                List<ProcessTaskRelation> relationList =
+                        processService.findRelationByCode(code, processDefinition.getVersion());
                 if (CollectionUtils.isEmpty(relationList)) {
                     putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
                     return result;
@@ -2357,7 +2468,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                 processDefinition.setReleaseState(releaseState);
                 int updateProcess = processDefinitionMapper.updateById(processDefinition);
                 if (updateProcess > 0) {
-                    logger.info("set schedule offline, project code: {}, schedule id: {}, process definition code: {}", projectCode, scheduleObj.getId(), code);
+                    logger.info("set schedule offline, project code: {}, schedule id: {}, process definition code: {}",
+                            projectCode, scheduleObj.getId(), code);
                     // set status
                     scheduleObj.setReleaseState(ReleaseState.OFFLINE);
                     int updateSchedule = scheduleMapper.updateById(scheduleObj);
@@ -2384,7 +2496,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
      * @param otherParamsJson
      */
     @Override
-    public void saveOtherRelation(User loginUser, ProcessDefinition processDefinition, Map<String, Object> result, String otherParamsJson) {
+    public void saveOtherRelation(User loginUser, ProcessDefinition processDefinition, Map<String, Object> result,
+                                  String otherParamsJson) {
 
     }
 
diff --git a/dolphinscheduler-bom/pom.xml b/dolphinscheduler-bom/pom.xml
index e142747ee7..746eba31c9 100644
--- a/dolphinscheduler-bom/pom.xml
+++ b/dolphinscheduler-bom/pom.xml
@@ -15,7 +15,6 @@
   ~ See the License for the specific language governing permissions and
   ~ limitations under the License.
   -->
-
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
@@ -25,9 +24,9 @@
         <version>dev-SNAPSHOT</version>
     </parent>
     <artifactId>dolphinscheduler-bom</artifactId>
-    <name>${project.artifactId}</name>
     <packaging>pom</packaging>
-
+    <name>${project.artifactId}</name>
+    
     <properties>
         <netty.version>4.1.53.Final</netty.version>
         <spring-boot.version>2.5.6</spring-boot.version>
@@ -89,9 +88,9 @@
         <joda-time.version>2.10.13</joda-time.version>
         <okhttp.version>3.14.9</okhttp.version>
         <json-path.version>2.7.0</json-path.version>
-
+        
     </properties>
-
+    
     <dependencyManagement>
         <dependencies>
             <!-- netty -->
@@ -99,15 +98,15 @@
                 <groupId>io.netty</groupId>
                 <artifactId>netty-bom</artifactId>
                 <version>${netty.version}</version>
-                <scope>import</scope>
                 <type>pom</type>
+                <scope>import</scope>
             </dependency>
             <dependency>
                 <groupId>io.netty</groupId>
                 <artifactId>netty-all</artifactId>
                 <version>${netty.version}</version>
             </dependency>
-
+            
             <!-- spring -->
             <dependency>
                 <groupId>org.springframework.boot</groupId>
@@ -147,7 +146,7 @@
                 <version>${spring.version}</version>
                 <scope>test</scope>
             </dependency>
-
+            
             <dependency>
                 <groupId>org.java-websocket</groupId>
                 <artifactId>Java-WebSocket</artifactId>
@@ -169,7 +168,7 @@
                 <artifactId>mybatis-plus-annotation</artifactId>
                 <version>${mybatis-plus.version}</version>
             </dependency>
-
+            
             <!-- quartz-->
             <dependency>
                 <groupId>org.quartz-scheduler</groupId>
@@ -181,13 +180,13 @@
                 <artifactId>cron-utils</artifactId>
                 <version>${cron-utils.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>com.alibaba</groupId>
                 <artifactId>druid</artifactId>
                 <version>${druid.version}</version>
             </dependency>
-
+            
             <!-- Zookeeper -->
             <dependency>
                 <groupId>org.apache.zookeeper</groupId>
@@ -199,8 +198,8 @@
                         <artifactId>slf4j-log4j12</artifactId>
                     </exclusion>
                     <exclusion>
-                        <artifactId>netty</artifactId>
                         <groupId>io.netty</groupId>
+                        <artifactId>netty</artifactId>
                     </exclusion>
                     <exclusion>
                         <groupId>com.github.spotbugs</groupId>
@@ -246,7 +245,7 @@
                 <artifactId>curator-test</artifactId>
                 <version>${curator-test.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>commons-codec</groupId>
                 <artifactId>commons-codec</artifactId>
@@ -282,7 +281,7 @@
                 <artifactId>commons-email</artifactId>
                 <version>${commons-email.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>org.apache.httpcomponents</groupId>
                 <artifactId>httpclient</artifactId>
@@ -309,7 +308,7 @@
                 <artifactId>jackson-core</artifactId>
                 <version>${jackson.version}</version>
             </dependency>
-
+            
             <!--protostuff-->
             <dependency>
                 <groupId>io.protostuff</groupId>
@@ -321,33 +320,33 @@
                 <artifactId>protostuff-runtime</artifactId>
                 <version>${protostuff.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>net.bytebuddy</groupId>
                 <artifactId>byte-buddy</artifactId>
                 <version>${byte-buddy.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>org.reflections</groupId>
                 <artifactId>reflections</artifactId>
                 <version>${reflections.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>mysql</groupId>
                 <artifactId>mysql-connector-java</artifactId>
                 <version>${mysql-connector.version}</version>
                 <scope>test</scope>
             </dependency>
-
+            
             <dependency>
                 <groupId>com.oracle.database.jdbc</groupId>
                 <artifactId>ojdbc8</artifactId>
                 <version>${oracle-jdbc.version}</version>
                 <scope>test</scope>
             </dependency>
-
+            
             <dependency>
                 <groupId>com.h2database</groupId>
                 <artifactId>h2</artifactId>
@@ -375,7 +374,7 @@
                 <artifactId>logback-core</artifactId>
                 <version>${logback.version}</version>
             </dependency>
-
+            
             <!--excel poi-->
             <dependency>
                 <groupId>org.apache.poi</groupId>
@@ -387,7 +386,7 @@
                 <artifactId>poi-ooxml</artifactId>
                 <version>${poi.version}</version>
             </dependency>
-
+            
             <!-- hadoop -->
             <dependency>
                 <groupId>org.apache.hadoop</groupId>
@@ -395,12 +394,12 @@
                 <version>${hadoop.version}</version>
                 <exclusions>
                     <exclusion>
-                        <artifactId>slf4j-log4j12</artifactId>
                         <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
                     </exclusion>
                     <exclusion>
-                        <artifactId>com.sun.jersey</artifactId>
                         <groupId>jersey-json</groupId>
+                        <artifactId>com.sun.jersey</artifactId>
                     </exclusion>
                     <exclusion>
                         <groupId>junit</groupId>
@@ -427,37 +426,37 @@
                 <artifactId>hadoop-yarn-common</artifactId>
                 <version>${hadoop.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-collections4</artifactId>
                 <version>${commons-collections4.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>com.google.guava</groupId>
                 <artifactId>guava</artifactId>
                 <version>${guava.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>org.postgresql</groupId>
                 <artifactId>postgresql</artifactId>
                 <version>${postgresql.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>org.apache.hive</groupId>
                 <artifactId>hive-jdbc</artifactId>
                 <version>${hive-jdbc.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>commons-io</groupId>
                 <artifactId>commons-io</artifactId>
                 <version>${commons-io.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>com.github.oshi</groupId>
                 <artifactId>oshi-core</artifactId>
@@ -477,31 +476,31 @@
                     </exclusion>
                 </exclusions>
             </dependency>
-
+            
             <dependency>
                 <groupId>ru.yandex.clickhouse</groupId>
                 <artifactId>clickhouse-jdbc</artifactId>
                 <version>${clickhouse-jdbc.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>com.microsoft.sqlserver</groupId>
                 <artifactId>mssql-jdbc</artifactId>
                 <version>${mssql-jdbc.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>com.facebook.presto</groupId>
                 <artifactId>presto-jdbc</artifactId>
                 <version>${presto-jdbc.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>javax.servlet</groupId>
                 <artifactId>servlet-api</artifactId>
                 <version>${servlet-api.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>javax.servlet</groupId>
                 <artifactId>javax.servlet-api</artifactId>
@@ -512,49 +511,49 @@
                 <artifactId>springfox-swagger2</artifactId>
                 <version>${springfox.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>io.springfox</groupId>
                 <artifactId>springfox-swagger-ui</artifactId>
                 <version>${springfox.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>io.swagger</groupId>
                 <artifactId>swagger-models</artifactId>
                 <version>${swagger-models.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>com.github.xiaoymin</groupId>
                 <artifactId>swagger-bootstrap-ui</artifactId>
                 <version>${swagger.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>com.github.rholder</groupId>
                 <artifactId>guava-retrying</artifactId>
                 <version>${guava-retry.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>javax.activation</groupId>
                 <artifactId>activation</artifactId>
                 <version>${activation.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>com.sun.mail</groupId>
                 <artifactId>javax.mail</artifactId>
                 <version>${javax-mail}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>net.sf.py4j</groupId>
                 <artifactId>py4j</artifactId>
                 <version>${py4j.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>com.google.code.findbugs</groupId>
                 <artifactId>jsr305</artifactId>
@@ -575,7 +574,7 @@
                 <artifactId>error_prone_annotations</artifactId>
                 <version>${error_prone_annotations.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>io.fabric8</groupId>
                 <artifactId>kubernetes-client</artifactId>
@@ -587,7 +586,7 @@
                 <artifactId>hibernate-validator</artifactId>
                 <version>${hibernate-validator.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>com.amazonaws</groupId>
                 <artifactId>aws-java-sdk-emr</artifactId>
@@ -598,26 +597,26 @@
                 <artifactId>joda-time</artifactId>
                 <version>${joda-time.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>com.amazonaws</groupId>
                 <artifactId>aws-java-sdk-s3</artifactId>
                 <version>${aws-sdk.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>com.squareup.okhttp3</groupId>
                 <artifactId>okhttp</artifactId>
                 <version>${okhttp.version}</version>
             </dependency>
-
+            
             <dependency>
                 <groupId>com.jayway.jsonpath</groupId>
                 <artifactId>json-path</artifactId>
                 <version>${json-path.version}</version>
             </dependency>
-
+            
         </dependencies>
-
+        
     </dependencyManagement>
-</project>
\ No newline at end of file
+</project>
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 9faf69c70a..6350761205 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -25,9 +25,6 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import java.time.Duration;
 import java.util.regex.Pattern;
 
-/**
- * Constants
- */
 public final class Constants {
 
     private Constants() {
@@ -44,13 +41,9 @@ public final class Constants {
      */
     public static final String REGISTRY_DOLPHINSCHEDULER_MASTERS = "/nodes/master";
     public static final String REGISTRY_DOLPHINSCHEDULER_WORKERS = "/nodes/worker";
-    public static final String REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS = "/dead-servers";
     public static final String REGISTRY_DOLPHINSCHEDULER_NODE = "/nodes";
     public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS = "/lock/masters";
     public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters";
-    public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers";
-    public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS =
-            "/lock/failover/startup-masters";
 
     public static final String FORMAT_SS = "%s%s";
     public static final String FORMAT_S_S = "%s/%s";
@@ -639,20 +632,20 @@ public final class Constants {
     public static final String TASK_LOG_INFO_FORMAT = "TaskLogInfo-%s";
 
     public static final int[] NOT_TERMINATED_STATES = new int[]{
-            WorkflowExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
-            TaskExecutionStatus.DISPATCH.ordinal(),
-            WorkflowExecutionStatus.RUNNING_EXECUTION.ordinal(),
-            WorkflowExecutionStatus.DELAY_EXECUTION.ordinal(),
-            WorkflowExecutionStatus.READY_PAUSE.ordinal(),
-            WorkflowExecutionStatus.READY_STOP.ordinal(),
-            TaskExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
+            WorkflowExecutionStatus.SUBMITTED_SUCCESS.getCode(),
+            TaskExecutionStatus.DISPATCH.getCode(),
+            WorkflowExecutionStatus.RUNNING_EXECUTION.getCode(),
+            WorkflowExecutionStatus.DELAY_EXECUTION.getCode(),
+            WorkflowExecutionStatus.READY_PAUSE.getCode(),
+            WorkflowExecutionStatus.READY_STOP.getCode(),
+            TaskExecutionStatus.NEED_FAULT_TOLERANCE.getCode(),
     };
 
     public static final int[] RUNNING_PROCESS_STATE = new int[]{
-            TaskExecutionStatus.RUNNING_EXECUTION.ordinal(),
-            TaskExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
-            TaskExecutionStatus.DISPATCH.ordinal(),
-            WorkflowExecutionStatus.SERIAL_WAIT.ordinal()
+            TaskExecutionStatus.RUNNING_EXECUTION.getCode(),
+            TaskExecutionStatus.SUBMITTED_SUCCESS.getCode(),
+            TaskExecutionStatus.DISPATCH.getCode(),
+            WorkflowExecutionStatus.SERIAL_WAIT.getCode()
     };
 
     /**
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/lifecycle/ServerLifeCycleException.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/lifecycle/ServerLifeCycleException.java
new file mode 100644
index 0000000000..da84b29a56
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/lifecycle/ServerLifeCycleException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.lifecycle;
+
+public class ServerLifeCycleException extends Exception {
+
+    public ServerLifeCycleException(String message) {
+        super(message);
+    }
+
+    public ServerLifeCycleException(String message, Throwable throwable) {
+        super(message, throwable);
+    }
+}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/lifecycle/ServerLifeCycleManager.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/lifecycle/ServerLifeCycleManager.java
new file mode 100644
index 0000000000..1e2a93f638
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/lifecycle/ServerLifeCycleManager.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.lifecycle;
+
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
+public class ServerLifeCycleManager {
+
+    private static volatile ServerStatus serverStatus = ServerStatus.RUNNING;
+
+    public static boolean isRunning() {
+        return serverStatus == ServerStatus.RUNNING;
+    }
+
+    public static boolean isStopped() {
+        return serverStatus == ServerStatus.STOPPED;
+    }
+
+    public static ServerStatus getServerStatus() {
+        return serverStatus;
+    }
+
+    /**
+     * Change the current server state to {@link ServerStatus#WAITING}, only {@link ServerStatus#RUNNING} can change to {@link ServerStatus#WAITING}.
+     *
+     * @throws ServerLifeCycleException if change failed.
+     */
+    public static synchronized void toWaiting() throws ServerLifeCycleException {
+        if (isStopped()) {
+            throw new ServerLifeCycleException("The current server is already stopped, cannot change to waiting");
+        }
+
+        if (serverStatus != ServerStatus.RUNNING) {
+            throw new ServerLifeCycleException("The current server is not at running status, cannot change to waiting");
+        }
+        serverStatus = ServerStatus.WAITING;
+    }
+
+    /**
+     * Recover from {@link ServerStatus#WAITING} to {@link ServerStatus#RUNNING}.
+     *
+     * @throws ServerLifeCycleException if change failed
+     */
+    public static synchronized void recoverFromWaiting() throws ServerLifeCycleException {
+        if (serverStatus != ServerStatus.WAITING) {
+            throw new ServerLifeCycleException("The current server status is not waiting, cannot recover form waiting");
+        }
+        serverStatus = ServerStatus.RUNNING;
+    }
+
+    public static synchronized boolean toStopped() {
+        if (serverStatus == ServerStatus.STOPPED) {
+            return false;
+        }
+        serverStatus = ServerStatus.STOPPED;
+        return true;
+    }
+
+}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/lifecycle/ServerStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/lifecycle/ServerStatus.java
new file mode 100644
index 0000000000..2d2494db53
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/lifecycle/ServerStatus.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.lifecycle;
+
+/**
+ * This enum is used to represent the server status, include master/worker.
+ */
+public enum ServerStatus {
+
+    RUNNING(0, "The current server is running"),
+    WAITING(1, "The current server is waiting, this means it cannot work"),
+    STOPPED(2, "The current server is stopped"),
+    ;
+
+    private final int code;
+    private final String desc;
+
+    ServerStatus(int code, String desc) {
+        this.code = code;
+        this.desc = desc;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public String getDesc() {
+        return desc;
+    }
+}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
deleted file mode 100644
index 777203c0f3..0000000000
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.common.thread;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import lombok.experimental.UtilityClass;
-
-/**
- * If the process closes, a signal is placed as true, and all threads get this flag to stop working.
- */
-@UtilityClass
-public class Stopper {
-
-    private static final AtomicBoolean stoppedSignal = new AtomicBoolean(false);
-
-    /**
-     * Return the flag if the Server is stopped.
-     *
-     * @return True, if the server is stopped; False, the server is still running.
-     */
-    public static boolean isStopped() {
-        return stoppedSignal.get();
-    }
-
-    /**
-     * Return the flag if the Server is stopped.
-     *
-     * @return True, if the server is running, False, the server is stopped.
-     */
-    public static boolean isRunning() {
-        return !stoppedSignal.get();
-    }
-
-    /**
-     * Stop the server
-     *
-     * @return True, if the server stopped success. False, if the server is already stopped.
-     */
-    public static boolean stop() {
-        return stoppedSignal.compareAndSet(false, true);
-    }
-}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 1415aaa840..6419a97365 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.master;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
-import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
@@ -29,9 +29,6 @@ import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
 import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
-
-import javax.annotation.PostConstruct;
-
 import org.quartz.SchedulerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,11 +39,14 @@ import org.springframework.cache.annotation.EnableCaching;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.transaction.annotation.EnableTransactionManagement;
 
+import javax.annotation.PostConstruct;
+
 @SpringBootApplication
 @ComponentScan("org.apache.dolphinscheduler")
 @EnableTransactionManagement
 @EnableCaching
 public class MasterServer implements IStoppable {
+
     private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);
 
     @Autowired
@@ -90,7 +90,6 @@ public class MasterServer implements IStoppable {
         this.taskPluginManager.loadPlugin();
 
         // self tolerant
-        this.masterRegistryClient.init();
         this.masterRegistryClient.start();
         this.masterRegistryClient.setRegistryStoppable(this);
 
@@ -103,7 +102,7 @@ public class MasterServer implements IStoppable {
         this.schedulerApi.start();
 
         Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-            if (Stopper.isRunning()) {
+            if (!ServerLifeCycleManager.isStopped()) {
                 close("MasterServer shutdownHook");
             }
         }));
@@ -117,19 +116,20 @@ public class MasterServer implements IStoppable {
     public void close(String cause) {
         // set stop signal is true
         // execute only once
-        if (!Stopper.stop()) {
+        if (!ServerLifeCycleManager.toStopped()) {
             logger.warn("MasterServer is already stopped, current cause: {}", cause);
             return;
         }
         // thread sleep 3 seconds for thread quietly stop
         ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
-        try (SchedulerApi closedSchedulerApi = schedulerApi;
-             MasterSchedulerBootstrap closedSchedulerBootstrap = masterSchedulerBootstrap;
-             MasterRPCServer closedRpcServer = masterRPCServer;
-             MasterRegistryClient closedMasterRegistryClient = masterRegistryClient;
-             // close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
-             // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
-             SpringApplicationContext closedSpringContext = springApplicationContext) {
+        try (
+                SchedulerApi closedSchedulerApi = schedulerApi;
+                MasterSchedulerBootstrap closedSchedulerBootstrap = masterSchedulerBootstrap;
+                MasterRPCServer closedRpcServer = masterRPCServer;
+                MasterRegistryClient closedMasterRegistryClient = masterRegistryClient;
+                // close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
+                // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
+                SpringApplicationContext closedSpringContext = springApplicationContext) {
 
             logger.info("Master server is stopping, current cause : {}", cause);
         } catch (Exception e) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
index 0eb0f09a73..bf5a08b477 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.cache;
 
+import lombok.NonNull;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 
 import java.util.Collection;
@@ -65,4 +66,6 @@ public interface ProcessInstanceExecCacheManager {
      * @return all WorkflowExecuteThread in cache
      */
     Collection<WorkflowExecuteRunnable> getAll();
+
+    void clearCache();
 }
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
index 8f00029a3e..3588e3d336 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
@@ -38,7 +38,8 @@ import lombok.NonNull;
 @Component
 public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecCacheManager {
 
-    private final ConcurrentHashMap<Integer, WorkflowExecuteRunnable> processInstanceExecMaps = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Integer, WorkflowExecuteRunnable> processInstanceExecMaps =
+            new ConcurrentHashMap<>();
 
     @PostConstruct
     public void registerMetrics() {
@@ -69,4 +70,9 @@ public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecC
     public Collection<WorkflowExecuteRunnable> getAll() {
         return ImmutableList.copyOf(processInstanceExecMaps.values());
     }
+
+    @Override
+    public void clearCache() {
+        processInstanceExecMaps.clear();
+    }
 }
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 7f6f124164..26e2cbe02a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -17,26 +17,32 @@
 
 package org.apache.dolphinscheduler.server.master.config;
 
+import lombok.Data;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
 import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
-
-import java.time.Duration;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.validation.Errors;
 import org.springframework.validation.Validator;
 import org.springframework.validation.annotation.Validated;
 
-import lombok.Data;
+import java.time.Duration;
+
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
 
 @Data
 @Validated
 @Configuration
 @ConfigurationProperties(prefix = "master")
 public class MasterConfig implements Validator {
+
+    private Logger logger = LoggerFactory.getLogger(MasterConfig.class);
+
     /**
      * The master RPC server listen port.
      */
@@ -67,10 +73,6 @@ public class MasterConfig implements Validator {
      * Master heart beat task execute interval.
      */
     private Duration heartbeatInterval = Duration.ofSeconds(10);
-    /**
-     * Master heart beat task error threshold, if the continuous error count exceed this count, the master will close.
-     */
-    private int heartbeatErrorThreshold = 5;
     /**
      * task submit max retry times.
      */
@@ -87,11 +89,14 @@ public class MasterConfig implements Validator {
     private double reservedMemory = 0.3;
     private Duration failoverInterval = Duration.ofMinutes(10);
     private boolean killYarnJobWhenTaskFailover = true;
-    /**
-     * ip:listenPort
-     */
+    private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties();
+
+    // ip:listenPort
     private String masterAddress;
 
+    // /nodes/master/ip:listenPort
+    private String masterRegistryNodePath;
+
     @Override
     public boolean supports(Class<?> clazz) {
         return MasterConfig.class.isAssignableFrom(clazz);
@@ -133,9 +138,29 @@ public class MasterConfig implements Validator {
         if (masterConfig.getMaxCpuLoadAvg() <= 0) {
             masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
         }
-        if (masterConfig.getHeartbeatErrorThreshold() <= 0) {
-            errors.rejectValue("heartbeat-error-threshold", null, "should be a positive value");
-        }
         masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
+        masterConfig
+                .setMasterRegistryNodePath(REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterConfig.getMasterAddress());
+        printConfig();
+    }
+
+    private void printConfig() {
+        logger.info("Master config: listenPort -> {} ", listenPort);
+        logger.info("Master config: fetchCommandNum -> {} ", fetchCommandNum);
+        logger.info("Master config: preExecThreads -> {} ", preExecThreads);
+        logger.info("Master config: execThreads -> {} ", execThreads);
+        logger.info("Master config: dispatchTaskNumber -> {} ", dispatchTaskNumber);
+        logger.info("Master config: hostSelector -> {} ", hostSelector);
+        logger.info("Master config: heartbeatInterval -> {} ", heartbeatInterval);
+        logger.info("Master config: taskCommitRetryTimes -> {} ", taskCommitRetryTimes);
+        logger.info("Master config: taskCommitInterval -> {} ", taskCommitInterval);
+        logger.info("Master config: stateWheelInterval -> {} ", stateWheelInterval);
+        logger.info("Master config: maxCpuLoadAvg -> {} ", maxCpuLoadAvg);
+        logger.info("Master config: reservedMemory -> {} ", reservedMemory);
+        logger.info("Master config: failoverInterval -> {} ", failoverInterval);
+        logger.info("Master config: killYarnJobWhenTaskFailover -> {} ", killYarnJobWhenTaskFailover);
+        logger.info("Master config: registryDisconnectStrategy -> {} ", registryDisconnectStrategy);
+        logger.info("Master config: masterAddress -> {} ", masterAddress);
+        logger.info("Master config: masterRegistryNodePath -> {} ", masterRegistryNodePath);
     }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index a346142a74..7bff4e9d21 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -17,9 +17,10 @@
 
 package org.apache.dolphinscheduler.server.master.consumer;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -39,9 +40,12 @@ import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.TaskPriority;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
-import org.apache.commons.collections.CollectionUtils;
-
+import javax.annotation.PostConstruct;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -51,13 +55,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import javax.annotation.PostConstruct;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
 /**
  * TaskUpdateQueue consumer
  */
@@ -116,7 +113,8 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
 
     @PostConstruct
     public void init() {
-        this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());
+        this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils
+                .newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());
         logger.info("Task priority queue consume thread staring");
         super.start();
         logger.info("Task priority queue consume thread started");
@@ -125,7 +123,7 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
     @Override
     public void run() {
         int fetchTaskNum = masterConfig.getDispatchTaskNumber();
-        while (Stopper.isRunning()) {
+        while (!ServerLifeCycleManager.isStopped()) {
             try {
                 List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);
 
@@ -189,23 +187,24 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
         boolean result = false;
         try {
             WorkflowExecuteRunnable workflowExecuteRunnable =
-                processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
+                    processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
             if (workflowExecuteRunnable == null) {
                 logger.error("Cannot find the related processInstance of the task, taskPriority: {}", taskPriority);
                 return true;
             }
             Optional<TaskInstance> taskInstanceOptional =
-                workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());
+                    workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());
             if (!taskInstanceOptional.isPresent()) {
                 logger.error("Cannot find the task instance from related processInstance, taskPriority: {}",
-                    taskPriority);
+                        taskPriority);
                 // we return true, so that we will drop this task.
                 return true;
             }
             TaskInstance taskInstance = taskInstanceOptional.get();
             TaskExecutionContext context = taskPriority.getTaskExecutionContext();
             ExecutionContext executionContext =
-                new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup(), taskInstance);
+                    new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup(),
+                            taskInstance);
 
             if (isTaskNeedToCheck(taskPriority)) {
                 if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
@@ -218,13 +217,13 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
 
             if (result) {
                 logger.info("Master success dispatch task to worker, taskInstanceId: {}, worker: {}",
-                    taskPriority.getTaskId(),
-                    executionContext.getHost());
+                        taskPriority.getTaskId(),
+                        executionContext.getHost());
                 addDispatchEvent(context, executionContext);
             } else {
                 logger.info("Master failed to dispatch task to worker, taskInstanceId: {}, worker: {}",
-                    taskPriority.getTaskId(),
-                    executionContext.getHost());
+                        taskPriority.getTaskId(),
+                        executionContext.getHost());
             }
         } catch (RuntimeException | ExecuteException e) {
             logger.error("Master dispatch task to worker error, taskPriority: {}", taskPriority, e);
@@ -236,16 +235,17 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
      * add dispatch event
      */
     private void addDispatchEvent(TaskExecutionContext context, ExecutionContext executionContext) {
-        TaskEvent taskEvent = TaskEvent.newDispatchEvent(context.getProcessInstanceId(), context.getTaskInstanceId(), executionContext.getHost().getAddress());
+        TaskEvent taskEvent = TaskEvent.newDispatchEvent(context.getProcessInstanceId(), context.getTaskInstanceId(),
+                executionContext.getHost().getAddress());
         taskEventService.addEvent(taskEvent);
     }
 
     private Command toCommand(TaskExecutionContext taskExecutionContext) {
         // todo: we didn't set the host here, since right now we didn't need to retry this message.
         TaskDispatchCommand requestCommand = new TaskDispatchCommand(taskExecutionContext,
-                                                                     masterConfig.getMasterAddress(),
-                                                                     taskExecutionContext.getHost(),
-                                                                     System.currentTimeMillis());
+                masterConfig.getMasterAddress(),
+                taskExecutionContext.getHost(),
+                System.currentTimeMillis());
         return requestCommand.convert2Command();
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index e273e8f8aa..25b8037dd6 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -61,15 +61,9 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
     @Autowired
     private ServerNodeManager serverNodeManager;
 
-    @Autowired
-    private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
-
     @Autowired
     private TaskKillResponseProcessor taskKillResponseProcessor;
 
-    @Autowired
-    private TaskExecuteResponseProcessor taskExecuteResponseProcessor;
-
     @Autowired
     private TaskRecallProcessor taskRecallProcessor;
 
@@ -115,7 +109,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
                 doExecute(host, command);
                 success = true;
                 context.setHost(host);
-                // We set the host to taskInstance to avoid when the worker down, this taskInstance may not be failovered, due to the taskInstance's host
+                // We set the host to taskInstance to avoid when the worker down, this taskInstance may not be
+                // failovered, due to the taskInstance's host
                 // is not belongs to the down worker ISSUE-10842.
                 context.getTaskInstance().setHost(host.getAddress());
             } catch (ExecuteException ex) {
@@ -197,7 +192,4 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
         return nodes;
     }
 
-    public NettyRemotingClient getNettyRemotingClient() {
-        return nettyRemotingClient;
-    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventQueue.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventQueue.java
index 86c8b90cfa..2c6e647d3e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventQueue.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventQueue.java
@@ -45,4 +45,7 @@ public class WorkflowEventQueue {
         return workflowEventQueue.take();
     }
 
+    public void clearWorkflowEventQueue() {
+        workflowEventQueue.clear();
+    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
index 282219b8ec..b30c1dfa37 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
@@ -18,8 +18,8 @@
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
 import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@@ -112,7 +112,7 @@ public class StateEventResponseService {
         @Override
         public void run() {
             logger.info("State event loop service started");
-            while (Stopper.isRunning()) {
+            while (!ServerLifeCycleManager.isStopped()) {
                 try {
                     // if not task , blocking here
                     StateEvent stateEvent = eventQueue.take();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
index d4b97c09c7..878b8bc6d3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
@@ -18,23 +18,21 @@
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
 /**
  * task manager
  */
@@ -105,13 +103,14 @@ public class TaskEventService {
      * Dispatch event to target task runnable.
      */
     class TaskEventDispatchThread extends BaseDaemonThread {
+
         protected TaskEventDispatchThread() {
             super("TaskEventLoopThread");
         }
 
         @Override
         public void run() {
-            while (Stopper.isRunning()) {
+            while (!ServerLifeCycleManager.isStopped()) {
                 try {
                     // if not task event, blocking here
                     TaskEvent taskEvent = eventQueue.take();
@@ -139,7 +138,7 @@ public class TaskEventService {
         @Override
         public void run() {
             logger.info("event handler thread started");
-            while (Stopper.isRunning()) {
+            while (!ServerLifeCycleManager.isStopped()) {
                 try {
                     taskExecuteThreadPool.eventHandler();
                     TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectStrategy.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectStrategy.java
new file mode 100644
index 0000000000..4cecadce16
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectStrategy.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.registry;
+
+import org.apache.dolphinscheduler.registry.api.ConnectStrategy;
+
+public interface MasterConnectStrategy extends ConnectStrategy {
+
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
index d0b86fc660..1885d82430 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
@@ -17,44 +17,45 @@
 
 package org.apache.dolphinscheduler.server.master.registry;
 
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.registry.api.ConnectionListener;
 import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import lombok.NonNull;
-
 public class MasterConnectionStateListener implements ConnectionListener {
 
     private static final Logger logger = LoggerFactory.getLogger(MasterConnectionStateListener.class);
 
-    private final String masterNodePath;
+    private final MasterConfig masterConfig;
     private final RegistryClient registryClient;
+    private final MasterConnectStrategy masterConnectStrategy;
 
-    public MasterConnectionStateListener(@NonNull String masterNodePath, @NonNull RegistryClient registryClient) {
-        this.masterNodePath = masterNodePath;
+    public MasterConnectionStateListener(@NonNull MasterConfig masterConfig,
+                                         @NonNull RegistryClient registryClient,
+                                         @NonNull MasterConnectStrategy masterConnectStrategy) {
+        this.masterConfig = masterConfig;
         this.registryClient = registryClient;
+        this.masterConnectStrategy = masterConnectStrategy;
     }
 
     @Override
     public void onUpdate(ConnectionState state) {
+        logger.info("Master received a {} event from registry, the current server state is {}", state,
+                ServerLifeCycleManager.getServerStatus());
         switch (state) {
             case CONNECTED:
-                logger.debug("registry connection state is {}", state);
                 break;
             case SUSPENDED:
-                logger.warn("registry connection state is {}, ready to retry connection", state);
                 break;
             case RECONNECTED:
-                logger.debug("registry connection state is {}, clean the node info", state);
-                registryClient.remove(masterNodePath);
-                registryClient.persistEphemeral(masterNodePath, "");
+                masterConnectStrategy.reconnect();
                 break;
             case DISCONNECTED:
-                logger.warn("registry connection state is {}, ready to stop myself", state);
-                registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself");
+                masterConnectStrategy.disconnect();
                 break;
             default:
         }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 722a322288..63938bd928 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -17,11 +17,8 @@
 
 package org.apache.dolphinscheduler.server.master.registry;
 
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
-import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
-
-import org.apache.dolphinscheduler.common.Constants;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -32,21 +29,18 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.service.FailoverService;
 import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
-
-import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
 import java.time.Duration;
-import java.util.Collections;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import com.google.common.collect.Sets;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
+import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
 
 /**
  * <p>DolphinScheduler master register client, used to connect to registry and hand the registry events.
@@ -55,48 +49,36 @@ import com.google.common.collect.Sets;
 @Component
 public class MasterRegistryClient implements AutoCloseable {
 
-    /**
-     * logger
-     */
     private static final Logger logger = LoggerFactory.getLogger(MasterRegistryClient.class);
 
-    /**
-     * failover service
-     */
     @Autowired
     private FailoverService failoverService;
 
     @Autowired
     private RegistryClient registryClient;
 
-    /**
-     * master config
-     */
     @Autowired
     private MasterConfig masterConfig;
 
-    /**
-     * heartbeat executor
-     */
+    @Autowired
+    private MasterConnectStrategy masterConnectStrategy;
+
     private ScheduledExecutorService heartBeatExecutor;
 
     /**
      * master startup time, ms
      */
     private long startupTime;
-    private String masterAddress;
-
-    public void init() {
-        this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
-        this.startupTime = System.currentTimeMillis();
-        this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
-    }
 
     public void start() {
         try {
+            this.startupTime = System.currentTimeMillis();
+            this.heartBeatExecutor =
+                    Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
             // master registry
             registry();
-            registryClient.addConnectionStateListener(new MasterConnectionStateListener(getCurrentNodePath(), registryClient));
+            registryClient.addConnectionStateListener(
+                    new MasterConnectionStateListener(masterConfig, registryClient, masterConnectStrategy));
             registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
         } catch (Exception e) {
             throw new RegistryException("Master registry client start up error", e);
@@ -137,11 +119,8 @@ public class MasterRegistryClient implements AutoCloseable {
         try {
             if (!registryClient.exists(path)) {
                 logger.info("path: {} not exists", path);
-                // handle dead server
-                registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP);
             }
-
-            //failover server
+            // failover server
             if (failover) {
                 failoverService.failoverServerWhenDown(serverHost, nodeType);
             }
@@ -169,11 +148,9 @@ public class MasterRegistryClient implements AutoCloseable {
                 }
                 if (!registryClient.exists(path)) {
                     logger.info("path: {} not exists", path);
-                    // handle dead server
-                    registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP);
                 }
             }
-            //failover server
+            // failover server
             if (failover) {
                 failoverService.failoverServerWhenDown(serverHost, nodeType);
             }
@@ -186,16 +163,14 @@ public class MasterRegistryClient implements AutoCloseable {
      * Registry the current master server itself to registry.
      */
     void registry() {
-        logger.info("Master node : {} registering to registry center", masterAddress);
-        String localNodePath = getCurrentNodePath();
+        logger.info("Master node : {} registering to registry center", masterConfig.getMasterAddress());
+        String localNodePath = masterConfig.getMasterRegistryNodePath();
         Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
         HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
-                                                        masterConfig.getMaxCpuLoadAvg(),
-                                                        masterConfig.getReservedMemory(),
-                                                        Sets.newHashSet(localNodePath),
-                                                        Constants.MASTER_TYPE,
-                                                        registryClient,
-                                                        masterConfig.getHeartbeatErrorThreshold());
+                masterConfig.getMaxCpuLoadAvg(),
+                masterConfig.getReservedMemory(),
+                Sets.newHashSet(localNodePath),
+                registryClient);
 
         // remove before persist
         registryClient.remove(localNodePath);
@@ -209,19 +184,17 @@ public class MasterRegistryClient implements AutoCloseable {
         // sleep 1s, waiting master failover remove
         ThreadUtils.sleep(SLEEP_TIME_MILLIS);
 
-        // delete dead server
-        registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP);
-
-        this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval.getSeconds(), TimeUnit.SECONDS);
-        logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s", masterAddress, masterHeartbeatInterval);
+        this.heartBeatExecutor.scheduleWithFixedDelay(heartBeatTask, 0L, masterHeartbeatInterval.getSeconds(),
+                TimeUnit.SECONDS);
+        logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s",
+                masterConfig.getMasterAddress(), masterHeartbeatInterval);
 
     }
 
     public void deregister() {
         try {
-            String localNodePath = getCurrentNodePath();
-            registryClient.remove(localNodePath);
-            logger.info("Master node : {} unRegistry to register center.", masterAddress);
+            registryClient.remove(masterConfig.getMasterRegistryNodePath());
+            logger.info("Master node : {} unRegistry to register center.", masterConfig.getMasterAddress());
             heartBeatExecutor.shutdown();
             logger.info("MasterServer heartbeat executor shutdown");
             registryClient.close();
@@ -230,11 +203,4 @@ public class MasterRegistryClient implements AutoCloseable {
         }
     }
 
-    /**
-     * get master path
-     */
-    private String getCurrentNodePath() {
-        return REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterAddress;
-    }
-
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java
new file mode 100644
index 0000000000..1b1f2c84a8
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterStopStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.registry;
+
+import org.apache.dolphinscheduler.registry.api.StrategyType;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Service;
+
+/**
+ * This strategy will stop the master server, when disconnected from {@link org.apache.dolphinscheduler.registry.api.Registry}.
+ */
+@Service
+@ConditionalOnProperty(prefix = "master.registry-disconnect-strategy", name = "strategy", havingValue = "stop", matchIfMissing = true)
+public class MasterStopStrategy implements MasterConnectStrategy {
+
+    private final Logger logger = LoggerFactory.getLogger(MasterStopStrategy.class);
+
+    @Autowired
+    private RegistryClient registryClient;
+    @Autowired
+    private MasterConfig masterConfig;
+
+    @Override
+    public void disconnect() {
+        registryClient.getStoppable()
+                .stop("Master disconnected from registry, will stop myself due to the stop strategy");
+    }
+
+    @Override
+    public void reconnect() {
+        logger.warn("The current connect strategy is stop, so the master will not reconnect to registry");
+    }
+
+    @Override
+    public StrategyType getStrategyType() {
+        return StrategyType.STOP;
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
new file mode 100644
index 0000000000..654d96f25b
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterWaitingStrategy.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.registry;
+
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import org.apache.dolphinscheduler.common.lifecycle.ServerStatus;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+import org.apache.dolphinscheduler.registry.api.StrategyType;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
+import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
+import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Service;
+
+import java.time.Duration;
+
+/**
+ * This strategy will change the server status to {@link ServerStatus#WAITING} when disconnect from {@link Registry}.
+ */
+@Service
+@ConditionalOnProperty(prefix = "master.registry-disconnect-strategy", name = "strategy", havingValue = "waiting")
+public class MasterWaitingStrategy implements MasterConnectStrategy {
+
+    private final Logger logger = LoggerFactory.getLogger(MasterWaitingStrategy.class);
+
+    @Autowired
+    private MasterConfig masterConfig;
+    @Autowired
+    private RegistryClient registryClient;
+    @Autowired
+    private MasterRPCServer masterRPCServer;
+    @Autowired
+    private WorkflowEventQueue workflowEventQueue;
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+    @Autowired
+    private StateWheelExecuteThread stateWheelExecuteThread;
+
+    @Override
+    public void disconnect() {
+        try {
+            ServerLifeCycleManager.toWaiting();
+            // todo: clear the current resource
+            clearMasterResource();
+            Duration maxWaitingTime = masterConfig.getRegistryDisconnectStrategy().getMaxWaitingTime();
+            try {
+                logger.info("Master disconnect from registry will try to reconnect in {} s",
+                        maxWaitingTime.getSeconds());
+                registryClient.connectUntilTimeout(maxWaitingTime);
+            } catch (RegistryException ex) {
+                throw new ServerLifeCycleException(
+                        String.format("Waiting to reconnect to registry in %s failed", maxWaitingTime), ex);
+            }
+        } catch (ServerLifeCycleException e) {
+            String errorMessage = String.format(
+                    "Disconnect from registry and change the current status to waiting error, the current server state is %s, will stop the current server",
+                    ServerLifeCycleManager.getServerStatus());
+            logger.error(errorMessage, e);
+            registryClient.getStoppable().stop(errorMessage);
+        } catch (RegistryException ex) {
+            String errorMessage = "Disconnect from registry and waiting to reconnect failed, will stop the server";
+            logger.error(errorMessage, ex);
+            registryClient.getStoppable().stop(errorMessage);
+        } catch (Exception ex) {
+            String errorMessage = "Disconnect from registry and get an unknown exception, will stop the server";
+            logger.error(errorMessage, ex);
+            registryClient.getStoppable().stop(errorMessage);
+        }
+    }
+
+    @Override
+    public void reconnect() {
+        try {
+            ServerLifeCycleManager.recoverFromWaiting();
+            reStartMasterResource();
+            // reopen the resource
+            logger.info("Recover from waiting success, the current server status is {}",
+                    ServerLifeCycleManager.getServerStatus());
+        } catch (Exception e) {
+            String errorMessage =
+                    String.format("Recover from waiting failed, the current server status is %s, will stop the server",
+                            ServerLifeCycleManager.getServerStatus());
+            logger.error(errorMessage, e);
+            registryClient.getStoppable().stop(errorMessage);
+        }
+    }
+
+    @Override
+    public StrategyType getStrategyType() {
+        return StrategyType.WAITING;
+    }
+
+    private void clearMasterResource() {
+        // close the worker resource, if close failed should stop the worker server
+        masterRPCServer.close();
+        logger.warn("Master closed RPC server due to lost registry connection");
+        workflowEventQueue.clearWorkflowEventQueue();
+        logger.warn("Master clear workflow event queue due to lost registry connection");
+        processInstanceExecCacheManager.clearCache();
+        logger.warn("Master clear process instance cache due to lost registry connection");
+        stateWheelExecuteThread.clearAllTasks();
+        logger.warn("Master clear all state wheel task due to lost registry connection");
+
+    }
+
+    private void reStartMasterResource() {
+        // reopen the resource, if reopen failed should stop the worker server
+        masterRPCServer.start();
+        logger.warn("Master restarted RPC server due to reconnect to registry");
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index c2a9de0446..7bdebe1ee0 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -82,8 +82,8 @@ public class MasterRPCServer implements AutoCloseable {
     @Autowired
     private TaskExecuteStartProcessor taskExecuteStartProcessor;
 
-    @PostConstruct
-    private void init() {
+    public void start() {
+        logger.info("Starting Master RPC Server...");
         // init remoting server
         NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(masterConfig.getListenPort());
@@ -106,11 +106,6 @@ public class MasterRPCServer implements AutoCloseable {
         this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
 
         this.nettyRemotingServer.start();
-    }
-
-    public void start() {
-        logger.info("Starting Master RPC Server...");
-        this.nettyRemotingServer.start();
         logger.info("Started Master RPC Server...");
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
index 24a23485ff..14b0252d7c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
@@ -18,8 +18,8 @@
 package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 import org.apache.dolphinscheduler.server.master.cache.StreamTaskInstanceExecCacheManager;
@@ -61,7 +61,7 @@ public class EventExecuteService extends BaseDaemonThread {
 
     @Override
     public void run() {
-        while (Stopper.isRunning()) {
+        while (!ServerLifeCycleManager.isStopped()) {
             try {
                 workflowEventHandler();
                 streamTaskEventHandler();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 16656c8760..5546b474d7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -18,13 +18,11 @@
 package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.service.FailoverService;
 import org.apache.dolphinscheduler.server.master.service.MasterFailoverService;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -60,8 +58,11 @@ public class FailoverExecuteThread extends BaseDaemonThread {
         // when startup, wait 10s for ready
         ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 10);
 
-        while (Stopper.isRunning()) {
+        while (!ServerLifeCycleManager.isStopped()) {
             try {
+                if (!ServerLifeCycleManager.isRunning()) {
+                    continue;
+                }
                 // todo: DO we need to schedule a task to do this kind of check
                 // This kind of check may only need to be executed when a master server start
                 masterFailoverService.checkMasterFailover();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index aa0542cb7c..5084c3d1cc 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -17,10 +17,11 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.SlotCheckState;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
@@ -40,8 +41,10 @@ import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
 import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-
-import org.apache.commons.collections4.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -49,11 +52,6 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadPoolExecutor;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
 /**
  * Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
  */
@@ -104,7 +102,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
      * constructor of MasterSchedulerService
      */
     public void init() {
-        this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
+        this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils
+                .newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
         this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
     }
 
@@ -127,11 +126,15 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
      */
     @Override
     public void run() {
-        while (Stopper.isRunning()) {
+        while (!ServerLifeCycleManager.isStopped()) {
             try {
+                if (!ServerLifeCycleManager.isRunning()) {
+                    // the current server is not at running status, cannot consume command.
+                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+                }
                 // todo: if the workflow event queue is much, we need to handle the back pressure
                 boolean isOverload =
-                    OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
+                        OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
                 if (isOverload) {
                     MasterServerMetrics.incMasterOverload();
                     Thread.sleep(Constants.SLEEP_TIME_MILLIS);
@@ -156,18 +159,19 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
                     try {
                         LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
                         if (processInstanceExecCacheManager.contains(processInstance.getId())) {
-                            logger.error("The workflow instance is already been cached, this case shouldn't be happened");
+                            logger.error(
+                                    "The workflow instance is already been cached, this case shouldn't be happened");
                         }
                         WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,
-                                                                                               processService,
-                                                                                               nettyExecutorManager,
-                                                                                               processAlertManager,
-                                                                                               masterConfig,
-                                                                                               stateWheelExecuteThread,
-                                                                                               curingGlobalParamsService);
+                                processService,
+                                nettyExecutorManager,
+                                processAlertManager,
+                                masterConfig,
+                                stateWheelExecuteThread,
+                                curingGlobalParamsService);
                         processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
                         workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
-                                                                      processInstance.getId()));
+                                processInstance.getId()));
                     } finally {
                         LoggerUtils.removeWorkflowInstanceIdMDC();
                     }
@@ -186,24 +190,28 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
 
     private List<ProcessInstance> command2ProcessInstance(List<Command> commands) throws InterruptedException {
         long commandTransformStartTime = System.currentTimeMillis();
-        logger.info("Master schedule bootstrap transforming command to ProcessInstance, commandSize: {}", commands.size());
+        logger.info("Master schedule bootstrap transforming command to ProcessInstance, commandSize: {}",
+                commands.size());
         List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));
         CountDownLatch latch = new CountDownLatch(commands.size());
         for (final Command command : commands) {
             masterPrepareExecService.execute(() -> {
                 try {
                     // Note: this check is not safe, the slot may change after command transform.
-                    // We use the database transaction in `handleCommand` so that we can guarantee the command will always be executed
+                    // We use the database transaction in `handleCommand` so that we can guarantee the command will
+                    // always be executed
                     // by only one master
                     SlotCheckState slotCheckState = slotCheck(command);
                     if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) {
-                        logger.info("Master handle command {} skip, slot check state: {}", command.getId(), slotCheckState);
+                        logger.info("Master handle command {} skip, slot check state: {}", command.getId(),
+                                slotCheckState);
                         return;
                     }
                     ProcessInstance processInstance = processService.handleCommand(masterAddress, command);
                     if (processInstance != null) {
                         processInstances.add(processInstance);
-                        logger.info("Master handle command {} end, create process instance {}", command.getId(), processInstance.getId());
+                        logger.info("Master handle command {} end, create process instance {}", command.getId(),
+                                processInstance.getId());
                     }
                 } catch (Exception e) {
                     logger.error("Master handle command {} error ", command.getId(), e);
@@ -216,9 +224,11 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
 
         // make sure to finish handling command each time before next scan
         latch.await();
-        logger.info("Master schedule bootstrap transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}",
-            commands.size(), processInstances.size());
-        ProcessInstanceMetrics.recordProcessInstanceGenerateTime(System.currentTimeMillis() - commandTransformStartTime);
+        logger.info(
+                "Master schedule bootstrap transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}",
+                commands.size(), processInstances.size());
+        ProcessInstanceMetrics
+                .recordProcessInstanceGenerateTime(System.currentTimeMillis() - commandTransformStartTime);
         return processInstances;
     }
 
@@ -233,10 +243,12 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
             }
             int pageNumber = 0;
             int pageSize = masterConfig.getFetchCommandNum();
-            final List<Command> result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
+            final List<Command> result =
+                    processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
             if (CollectionUtils.isNotEmpty(result)) {
-                logger.info("Master schedule bootstrap loop command success, command size: {}, current slot: {}, total slot size: {}",
-                    result.size(), thisMasterSlot, masterCount);
+                logger.info(
+                        "Master schedule bootstrap loop command success, command size: {}, current slot: {}, total slot size: {}",
+                        result.size(), thisMasterSlot, masterCount);
             }
             ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime);
             return result;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 9ea9b6574b..3c82a8adc9 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -21,9 +21,9 @@ import lombok.NonNull;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -97,7 +97,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
     @Override
     public void run() {
         final long checkInterval = masterConfig.getStateWheelInterval().toMillis();
-        while (Stopper.isRunning()) {
+        while (!ServerLifeCycleManager.isStopped()) {
             try {
                 checkTask4Timeout();
                 checkTask4Retry();
@@ -235,6 +235,13 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
         logger.info("Removed task instance from state check list");
     }
 
+    public void clearAllTasks() {
+        processInstanceTimeoutCheckList.clear();
+        taskInstanceTimeoutCheckList.clear();
+        taskInstanceRetryCheckList.clear();
+        taskInstanceStateCheckList.clear();
+    }
+
     private void checkTask4Timeout() {
         if (taskInstanceTimeoutCheckList.isEmpty()) {
             return;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java
index ee2e70bfd0..37e6143406 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java
@@ -18,8 +18,8 @@
 package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.server.master.event.WorkflowEvent;
@@ -28,18 +28,16 @@ import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandleExcept
 import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandler;
 import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
 import org.apache.dolphinscheduler.server.master.event.WorkflowEventType;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.PostConstruct;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 @Component
 public class WorkflowEventLooper extends BaseDaemonThread {
 
@@ -59,8 +57,9 @@ public class WorkflowEventLooper extends BaseDaemonThread {
 
     @PostConstruct
     public void init() {
-        workflowEventHandlerList.forEach(workflowEventHandler -> workflowEventHandlerMap.put(workflowEventHandler.getHandleWorkflowEventType(),
-                                                                                             workflowEventHandler));
+        workflowEventHandlerList.forEach(
+                workflowEventHandler -> workflowEventHandlerMap.put(workflowEventHandler.getHandleWorkflowEventType(),
+                        workflowEventHandler));
     }
 
     @Override
@@ -72,13 +71,13 @@ public class WorkflowEventLooper extends BaseDaemonThread {
 
     public void run() {
         WorkflowEvent workflowEvent = null;
-        while (Stopper.isRunning()) {
+        while (!ServerLifeCycleManager.isStopped()) {
             try {
                 workflowEvent = workflowEventQueue.poolEvent();
                 LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId());
                 logger.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent);
                 WorkflowEventHandler workflowEventHandler =
-                    workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());
+                        workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());
                 workflowEventHandler.handleWorkflowEvent(workflowEvent);
             } catch (InterruptedException e) {
                 logger.warn("WorkflowEventLooper thread is interrupted, will close this loop", e);
@@ -86,17 +85,17 @@ public class WorkflowEventLooper extends BaseDaemonThread {
                 break;
             } catch (WorkflowEventHandleException workflowEventHandleException) {
                 logger.error("Handle workflow event failed, will add this event to event queue again, event: {}",
-                    workflowEvent, workflowEventHandleException);
+                        workflowEvent, workflowEventHandleException);
                 workflowEventQueue.addEvent(workflowEvent);
                 ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             } catch (WorkflowEventHandleError workflowEventHandleError) {
                 logger.error("Handle workflow event error, will drop this event, event: {}",
-                             workflowEvent,
-                             workflowEventHandleError);
+                        workflowEvent,
+                        workflowEventHandleError);
             } catch (Exception unknownException) {
                 logger.error(
-                    "Handle workflow event failed, get a unknown exception, will add this event to event queue again, event: {}",
-                    workflowEvent, unknownException);
+                        "Handle workflow event failed, get a unknown exception, will add this event to event queue again, event: {}",
+                        workflowEvent, unknownException);
                 workflowEventQueue.addEvent(workflowEvent);
                 ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             } finally {
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml
index 9f191ccd25..d4a82e1afc 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -98,8 +98,6 @@ master:
   host-selector: lower_weight
   # master heartbeat interval
   heartbeat-interval: 10s
-  # Master heart beat task error threshold, if the continuous error count exceed this count, the master will close.
-  heartbeat-error-threshold: 5
   # master commit task retry times
   task-commit-retry-times: 5
   # master commit task interval
@@ -113,6 +111,11 @@ master:
   failover-interval: 10m
   # kill yarn jon when failover taskInstance, default true
   kill-yarn-job-when-task-failover: true
+  registry-disconnect-strategy:
+    # The disconnect strategy: stop, waiting
+    strategy: waiting
+    # The max waiting time to reconnect to registry if you set the strategy to waiting
+    max-waiting-time: 100s
 
 server:
   port: 5679
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
index 66b25eb351..b4796b05b0 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
@@ -19,9 +19,9 @@ package org.apache.dolphinscheduler.server.master;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.common.model.TaskNode;
-import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -35,11 +35,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameter
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-
-import java.time.Duration;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -47,6 +42,10 @@ import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.springframework.context.ApplicationContext;
 
+import java.time.Duration;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 /**
  * DependentTaskTest
  */
@@ -298,7 +297,7 @@ public class DependentTaskTest {
     @Test
     public void testWaitAndCancel() {
         // for the poor independence of UT, error on other place may causes the condition happens
-        if (!Stopper.isRunning()) {
+        if (!ServerLifeCycleManager.isRunning()) {
             return;
         }
 
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
index 9b4fe3be54..27357d3c05 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
@@ -18,9 +18,9 @@
 package org.apache.dolphinscheduler.server.master;
 
 import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.common.model.TaskNode;
-import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -50,7 +50,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({Stopper.class})
+@PrepareForTest({ServerLifeCycleManager.class})
 public class SubProcessTaskTest {
 
     /**
@@ -73,8 +73,8 @@ public class SubProcessTaskTest {
         config.setTaskCommitRetryTimes(3);
         config.setTaskCommitInterval(Duration.ofSeconds(1));
 
-        PowerMockito.mockStatic(Stopper.class);
-        PowerMockito.when(Stopper.isRunning()).thenReturn(true);
+        PowerMockito.mockStatic(ServerLifeCycleManager.class);
+        PowerMockito.when(ServerLifeCycleManager.isStopped()).thenReturn(false);
 
         processService = Mockito.mock(ProcessService.class);
         Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index 23cf9ed915..4b5bf87f66 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.master.consumer;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
-import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.dao.entity.DataSource;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -347,7 +347,7 @@ public class TaskPriorityQueueConsumerTest {
 
     @After
     public void close() {
-        Stopper.stop();
+        ServerLifeCycleManager.toStopped();
     }
 
 }
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
index fa80e8344a..162c43506d 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
@@ -62,15 +62,15 @@ public class ExecutorDispatcherTest {
         serverConfig.setListenPort(port);
         NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
         nettyRemotingServer.registerProcessor(CommandType.TASK_DISPATCH_REQUEST, Mockito.mock(
-            TaskDispatchProcessor.class));
+                TaskDispatchProcessor.class));
         nettyRemotingServer.start();
         //
         workerConfig.setListenPort(port);
-        workerRegistryClient.registry();
+        workerRegistryClient.start();
 
         ExecutionContext executionContext = ExecutionContextTestUtils.getExecutionContext(port);
         executorDispatcher.dispatch(executionContext);
 
-        workerRegistryClient.unRegistry();
+        workerRegistryClient.close();
     }
 }
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
index 2a4b4c33ff..cc17d22df7 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
@@ -80,7 +80,6 @@ public class MasterRegistryClientTest {
         given(registryClient.getStoppable()).willReturn(cause -> {
 
         });
-        doNothing().when(registryClient).handleDeadServer(Mockito.anySet(), Mockito.any(NodeType.class), Mockito.anyString());
         ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient);
 
         ProcessInstance processInstance = new ProcessInstance();
@@ -89,13 +88,15 @@ public class MasterRegistryClientTest {
         processInstance.setRestartTime(new Date());
         processInstance.setHistoryCmd("xxx");
         processInstance.setCommandType(CommandType.STOP);
-        given(processService.queryNeedFailoverProcessInstances(Mockito.anyString())).willReturn(Arrays.asList(processInstance));
+        given(processService.queryNeedFailoverProcessInstances(Mockito.anyString()))
+                .willReturn(Arrays.asList(processInstance));
         doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class));
         TaskInstance taskInstance = new TaskInstance();
         taskInstance.setId(1);
         taskInstance.setStartTime(new Date());
         taskInstance.setHost("127.0.0.1:8080");
-        given(processService.queryNeedFailoverTaskInstances(Mockito.anyString())).willReturn(Arrays.asList(taskInstance));
+        given(processService.queryNeedFailoverTaskInstances(Mockito.anyString()))
+                .willReturn(Arrays.asList(taskInstance));
         given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance);
         given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any())).willReturn(true);
         Server server = new Server();
@@ -115,7 +116,7 @@ public class MasterRegistryClientTest {
     public void removeNodePathTest() {
         masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, false);
         masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, true);
-        //Cannot mock static methods
+        // Cannot mock static methods
         masterRegistryClient.removeWorkerNodePath("/path", NodeType.WORKER, true);
     }
 }
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index c346cdda52..e0c3c94714 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -129,8 +129,6 @@ public class FailoverServiceTest {
         given(registryClient.getStoppable()).willReturn(cause -> {
         });
         given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any())).willReturn(true);
-        doNothing().when(registryClient).handleDeadServer(Mockito.anySet(), Mockito.any(NodeType.class),
-                Mockito.anyString());
 
         processInstance = new ProcessInstance();
         processInstance.setId(1);
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategy.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategy.java
new file mode 100644
index 0000000000..448a46841e
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategy.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+/**
+ * This interface defined a method to be executed when the server disconnected from registry.
+ */
+public interface ConnectStrategy {
+
+    void disconnect();
+
+    void reconnect();
+
+    StrategyType getStrategyType();
+
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategyProperties.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategyProperties.java
new file mode 100644
index 0000000000..e76cd57181
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectStrategyProperties.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+import lombok.Data;
+
+import java.time.Duration;
+
+@Data
+public class ConnectStrategyProperties {
+
+    private StrategyType strategy = StrategyType.STOP;
+
+    private Duration maxWaitingTime = Duration.ofSeconds(0);
+
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
index 7dfa478ea3..c1c0098732 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
@@ -19,11 +19,22 @@
 
 package org.apache.dolphinscheduler.registry.api;
 
+import lombok.NonNull;
+
 import java.io.Closeable;
 import java.time.Duration;
 import java.util.Collection;
 
 public interface Registry extends Closeable {
+
+    /**
+     * Connect to the registry, will wait in the given timeout
+     *
+     * @param timeout max timeout, if timeout <= 0 will wait indefinitely.
+     * @throws RegistryException cannot connect in the given timeout
+     */
+    void connectUntilTimeout(@NonNull Duration timeout) throws RegistryException;
+
     boolean subscribe(String path, SubscribeListener listener);
 
     void unsubscribe(String path);
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/StrategyType.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/StrategyType.java
new file mode 100644
index 0000000000..214177ec44
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/StrategyType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+public enum StrategyType {
+
+    STOP,
+    WAITING,
+    ;
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java
index dee884a12d..c093ca92a4 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/MysqlRegistry.java
@@ -17,25 +17,24 @@
 
 package org.apache.dolphinscheduler.plugin.registry.mysql;
 
+import lombok.NonNull;
 import org.apache.dolphinscheduler.plugin.registry.mysql.task.EphemeralDateManager;
 import org.apache.dolphinscheduler.plugin.registry.mysql.task.RegistryLockManager;
 import org.apache.dolphinscheduler.plugin.registry.mysql.task.SubscribeDataManager;
 import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
 import org.apache.dolphinscheduler.registry.api.Registry;
 import org.apache.dolphinscheduler.registry.api.RegistryException;
 import org.apache.dolphinscheduler.registry.api.SubscribeListener;
-
-import java.sql.SQLException;
-import java.time.Duration;
-import java.util.Collection;
-
-import javax.annotation.PostConstruct;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.util.Collection;
 
 /**
  * This is one of the implementation of {@link Registry}, with this implementation, you need to rely on mysql database to
@@ -47,6 +46,7 @@ public class MysqlRegistry implements Registry {
 
     private static Logger LOGGER = LoggerFactory.getLogger(MysqlRegistry.class);
 
+    private final MysqlRegistryProperties mysqlRegistryProperties;
     private final EphemeralDateManager ephemeralDateManager;
     private final SubscribeDataManager subscribeDataManager;
     private final RegistryLockManager registryLockManager;
@@ -56,6 +56,7 @@ public class MysqlRegistry implements Registry {
         this.mysqlOperator = new MysqlOperator(mysqlRegistryProperties);
         mysqlOperator.clearExpireLock();
         mysqlOperator.clearExpireEphemeralDate();
+        this.mysqlRegistryProperties = mysqlRegistryProperties;
         this.ephemeralDateManager = new EphemeralDateManager(mysqlRegistryProperties, mysqlOperator);
         this.subscribeDataManager = new SubscribeDataManager(mysqlRegistryProperties, mysqlOperator);
         this.registryLockManager = new RegistryLockManager(mysqlRegistryProperties, mysqlOperator);
@@ -72,6 +73,27 @@ public class MysqlRegistry implements Registry {
         LOGGER.info("Started Mysql Registry...");
     }
 
+    @Override
+    public void connectUntilTimeout(@NonNull Duration timeout) throws RegistryException {
+        long beginTimeMillis = System.currentTimeMillis();
+        long endTimeMills = timeout.getSeconds() <= 0 ? Long.MAX_VALUE : beginTimeMillis + timeout.toMillis();
+        while (true) {
+            if (System.currentTimeMillis() > endTimeMills) {
+                throw new RegistryException(
+                        String.format("Cannot connect to mysql registry in %s s", timeout.getSeconds()));
+            }
+            if (ephemeralDateManager.getConnectionState() == ConnectionState.CONNECTED) {
+                return;
+            }
+            try {
+                Thread.sleep(mysqlRegistryProperties.getTermRefreshInterval().toMillis());
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RegistryException("Cannot connect to mysql registry due to interrupted exception", e);
+            }
+        }
+    }
+
     @Override
     public boolean subscribe(String path, SubscribeListener listener) {
         // new a schedule thread to query the path, if the path
@@ -156,15 +178,15 @@ public class MysqlRegistry implements Registry {
         return true;
     }
 
-
     @Override
     public void close() {
         LOGGER.info("Closing Mysql Registry...");
         // remove the current Ephemeral node, if can connect to mysql
-        try (EphemeralDateManager closed1 = ephemeralDateManager;
-             SubscribeDataManager close2 = subscribeDataManager;
-             RegistryLockManager close3 = registryLockManager;
-             MysqlOperator closed4 = mysqlOperator) {
+        try (
+                EphemeralDateManager closed1 = ephemeralDateManager;
+                SubscribeDataManager close2 = subscribeDataManager;
+                RegistryLockManager close3 = registryLockManager;
+                MysqlOperator closed4 = mysqlOperator) {
         } catch (Exception e) {
             LOGGER.error("Close Mysql Registry error", e);
         }
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java
index 7e8bc53882..45bd992134 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-mysql/src/main/java/org/apache/dolphinscheduler/plugin/registry/mysql/task/EphemeralDateManager.java
@@ -46,6 +46,7 @@ public class EphemeralDateManager implements AutoCloseable {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(EphemeralDateManager.class);
 
+    private ConnectionState connectionState;
     private final MysqlOperator mysqlOperator;
     private final MysqlRegistryProperties registryProperties;
     private final List<ConnectionListener> connectionListeners = Collections.synchronizedList(new ArrayList<>());
@@ -78,6 +79,10 @@ public class EphemeralDateManager implements AutoCloseable {
         return ephemeralId;
     }
 
+    public ConnectionState getConnectionState() {
+        return connectionState;
+    }
+
     @Override
     public void close() throws SQLException {
         ephemeralDateIds.clear();
@@ -89,11 +94,11 @@ public class EphemeralDateManager implements AutoCloseable {
     }
 
     // Use this task to refresh ephemeral term and check the connect state.
-    static class EphemeralDateTermRefreshTask implements Runnable {
+    class EphemeralDateTermRefreshTask implements Runnable {
+
         private final List<ConnectionListener> connectionListeners;
         private final Set<Long> ephemeralDateIds;
         private final MysqlOperator mysqlOperator;
-        private ConnectionState connectionState;
 
         private EphemeralDateTermRefreshTask(MysqlOperator mysqlOperator,
                                              List<ConnectionListener> connectionListeners,
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
index 643ab8e707..5aa99b5198 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
@@ -17,14 +17,8 @@
 
 package org.apache.dolphinscheduler.plugin.registry.zookeeper;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-import org.apache.dolphinscheduler.registry.api.ConnectionListener;
-import org.apache.dolphinscheduler.registry.api.Event;
-import org.apache.dolphinscheduler.registry.api.Registry;
-import org.apache.dolphinscheduler.registry.api.RegistryException;
-import org.apache.dolphinscheduler.registry.api.SubscribeListener;
-
+import com.google.common.base.Strings;
+import lombok.NonNull;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
@@ -34,11 +28,19 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.Comparator;
@@ -47,16 +49,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import javax.annotation.PostConstruct;
-
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.stereotype.Component;
-
-import com.google.common.base.Strings;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 @Component
 @ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "zookeeper")
 public final class ZookeeperRegistry implements Registry {
+
     private final ZookeeperRegistryProperties.ZookeeperProperties properties;
     private final CuratorFramework client;
 
@@ -68,17 +66,17 @@ public final class ZookeeperRegistry implements Registry {
         properties = registryProperties.getZookeeper();
 
         final ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(
-            (int) properties.getRetryPolicy().getBaseSleepTime().toMillis(),
-            properties.getRetryPolicy().getMaxRetries(),
-            (int) properties.getRetryPolicy().getMaxSleep().toMillis());
+                (int) properties.getRetryPolicy().getBaseSleepTime().toMillis(),
+                properties.getRetryPolicy().getMaxRetries(),
+                (int) properties.getRetryPolicy().getMaxSleep().toMillis());
 
         CuratorFrameworkFactory.Builder builder =
-            CuratorFrameworkFactory.builder()
-                                   .connectString(properties.getConnectString())
-                                   .retryPolicy(retryPolicy)
-                                   .namespace(properties.getNamespace())
-                                   .sessionTimeoutMs((int) properties.getSessionTimeout().toMillis())
-                                   .connectionTimeoutMs((int) properties.getConnectionTimeout().toMillis());
+                CuratorFrameworkFactory.builder()
+                        .connectString(properties.getConnectString())
+                        .retryPolicy(retryPolicy)
+                        .namespace(properties.getNamespace())
+                        .sessionTimeoutMs((int) properties.getSessionTimeout().toMillis())
+                        .connectionTimeoutMs((int) properties.getConnectionTimeout().toMillis());
 
         final String digest = properties.getDigest();
         if (!Strings.isNullOrEmpty(digest)) {
@@ -89,17 +87,18 @@ public final class ZookeeperRegistry implements Registry {
 
     private void buildDigest(CuratorFrameworkFactory.Builder builder, String digest) {
         builder.authorization("digest", digest.getBytes(StandardCharsets.UTF_8))
-               .aclProvider(new ACLProvider() {
-                   @Override
-                   public List<ACL> getDefaultAcl() {
-                       return ZooDefs.Ids.CREATOR_ALL_ACL;
-                   }
-
-                   @Override
-                   public List<ACL> getAclForPath(final String path) {
-                       return ZooDefs.Ids.CREATOR_ALL_ACL;
-                   }
-               });
+                .aclProvider(new ACLProvider() {
+
+                    @Override
+                    public List<ACL> getDefaultAcl() {
+                        return ZooDefs.Ids.CREATOR_ALL_ACL;
+                    }
+
+                    @Override
+                    public List<ACL> getAclForPath(final String path) {
+                        return ZooDefs.Ids.CREATOR_ALL_ACL;
+                    }
+                });
     }
 
     @PostConstruct
@@ -112,6 +111,7 @@ public final class ZookeeperRegistry implements Registry {
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
+            throw new RegistryException("Zookeeper registry start failed", e);
         }
     }
 
@@ -120,6 +120,22 @@ public final class ZookeeperRegistry implements Registry {
         client.getConnectionStateListenable().addListener(new ZookeeperConnectionStateListener(listener));
     }
 
+    @Override
+    public void connectUntilTimeout(@NonNull Duration timeout) throws RegistryException {
+        try {
+            if (!client.blockUntilConnected((int) timeout.toMillis(), MILLISECONDS)) {
+                throw new RegistryException(
+                        String.format("Cannot connect to the Zookeeper registry in %s s", timeout.getSeconds()));
+            }
+        } catch (RegistryException e) {
+            throw e;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RegistryException(
+                    String.format("Cannot connect to the Zookeeper registry in %s s", timeout.getSeconds()), e);
+        }
+    }
+
     @Override
     public boolean subscribe(String path, SubscribeListener listener) {
         final TreeCache treeCache = treeCacheMap.computeIfAbsent(path, $ -> new TreeCache(client, path));
@@ -162,10 +178,10 @@ public final class ZookeeperRegistry implements Registry {
 
         try {
             client.create()
-                  .orSetData()
-                  .creatingParentsIfNeeded()
-                  .withMode(mode)
-                  .forPath(key, value.getBytes(StandardCharsets.UTF_8));
+                    .orSetData()
+                    .creatingParentsIfNeeded()
+                    .withMode(mode)
+                    .forPath(key, value.getBytes(StandardCharsets.UTF_8));
         } catch (Exception e) {
             throw new RegistryException("Failed to put registry key: " + key, e);
         }
@@ -186,8 +202,8 @@ public final class ZookeeperRegistry implements Registry {
     public void delete(String nodePath) {
         try {
             client.delete()
-                  .deletingChildrenIfNeeded()
-                  .forPath(nodePath);
+                    .deletingChildrenIfNeeded()
+                    .forPath(nodePath);
         } catch (KeeperException.NoNodeException ignored) {
             // Is already deleted or does not exist
         } catch (Exception e) {
@@ -239,6 +255,7 @@ public final class ZookeeperRegistry implements Registry {
     }
 
     static final class EventAdaptor extends Event {
+
         public EventAdaptor(TreeCacheEvent event, String key) {
             key(key);
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index c84abb4182..b146a25482 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -17,15 +17,15 @@
 
 package org.apache.dolphinscheduler.server.registry;
 
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.utils.HeartBeat;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * Heart beat task
  */
@@ -36,25 +36,18 @@ public class HeartBeatTask implements Runnable {
     private final Set<String> heartBeatPaths;
     private final RegistryClient registryClient;
     private int workerWaitingTaskCount;
-    private final String serverType;
     private final HeartBeat heartBeat;
 
-    private final int heartBeatErrorThreshold;
-
     private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
 
     public HeartBeatTask(long startupTime,
                          double maxCpuloadAvg,
                          double reservedMemory,
                          Set<String> heartBeatPaths,
-                         String serverType,
-                         RegistryClient registryClient,
-                         int heartBeatErrorThreshold) {
+                         RegistryClient registryClient) {
         this.heartBeatPaths = heartBeatPaths;
         this.registryClient = registryClient;
-        this.serverType = serverType;
         this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory);
-        this.heartBeatErrorThreshold = heartBeatErrorThreshold;
     }
 
     public HeartBeatTask(long startupTime,
@@ -62,17 +55,13 @@ public class HeartBeatTask implements Runnable {
                          double reservedMemory,
                          int hostWeight,
                          Set<String> heartBeatPaths,
-                         String serverType,
                          RegistryClient registryClient,
                          int workerThreadCount,
-                         int workerWaitingTaskCount,
-                         int heartBeatErrorThreshold) {
+                         int workerWaitingTaskCount) {
         this.heartBeatPaths = heartBeatPaths;
         this.registryClient = registryClient;
         this.workerWaitingTaskCount = workerWaitingTaskCount;
-        this.serverType = serverType;
         this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory, hostWeight, workerThreadCount);
-        this.heartBeatErrorThreshold = heartBeatErrorThreshold;
     }
 
     public String getHeartBeatInfo() {
@@ -82,14 +71,9 @@ public class HeartBeatTask implements Runnable {
     @Override
     public void run() {
         try {
-            // check dead or not in zookeeper
-            for (String heartBeatPath : heartBeatPaths) {
-                if (registryClient.checkIsDeadServer(heartBeatPath, serverType)) {
-                    registryClient.getStoppable().stop("i was judged to death, release resources and stop myself");
-                    return;
-                }
+            if (!ServerLifeCycleManager.isRunning()) {
+                return;
             }
-
             // update waiting task count
             heartBeat.setWorkerWaitingTaskCount(workerWaitingTaskCount);
 
@@ -98,11 +82,7 @@ public class HeartBeatTask implements Runnable {
             }
             heartBeatErrorTimes.set(0);
         } catch (Throwable ex) {
-            logger.error("HeartBeat task execute failed", ex);
-            if (heartBeatErrorTimes.incrementAndGet() >= heartBeatErrorThreshold) {
-                registryClient.getStoppable()
-                              .stop("HeartBeat task connect to zk failed too much times: " + heartBeatErrorTimes);
-            }
+            logger.error("HeartBeat task execute failed, errorTimes: {}", heartBeatErrorTimes.get(), ex);
         }
     }
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CronUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CronUtils.java
index da62ba47ef..81bf76a082 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CronUtils.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cron/CronUtils.java
@@ -17,26 +17,21 @@
 
 package org.apache.dolphinscheduler.service.cron;
 
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
-import static org.apache.dolphinscheduler.common.Constants.COMMA;
-import static org.apache.dolphinscheduler.service.cron.CycleFactory.day;
-import static org.apache.dolphinscheduler.service.cron.CycleFactory.hour;
-import static org.apache.dolphinscheduler.service.cron.CycleFactory.min;
-import static org.apache.dolphinscheduler.service.cron.CycleFactory.month;
-import static org.apache.dolphinscheduler.service.cron.CycleFactory.week;
-import static org.apache.dolphinscheduler.service.cron.CycleFactory.year;
-
-import static com.cronutils.model.CronType.QUARTZ;
-
+import com.cronutils.model.Cron;
+import com.cronutils.model.definition.CronDefinitionBuilder;
+import com.cronutils.model.time.ExecutionTime;
+import com.cronutils.parser.CronParser;
+import lombok.NonNull;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CycleEnum;
-import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.service.exceptions.CronParseException;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
@@ -49,29 +44,30 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cronutils.model.Cron;
-import com.cronutils.model.definition.CronDefinitionBuilder;
-import com.cronutils.model.time.ExecutionTime;
-import com.cronutils.parser.CronParser;
-
-import lombok.NonNull;
+import static com.cronutils.model.CronType.QUARTZ;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
+import static org.apache.dolphinscheduler.common.Constants.COMMA;
+import static org.apache.dolphinscheduler.service.cron.CycleFactory.day;
+import static org.apache.dolphinscheduler.service.cron.CycleFactory.hour;
+import static org.apache.dolphinscheduler.service.cron.CycleFactory.min;
+import static org.apache.dolphinscheduler.service.cron.CycleFactory.month;
+import static org.apache.dolphinscheduler.service.cron.CycleFactory.week;
+import static org.apache.dolphinscheduler.service.cron.CycleFactory.year;
 
 /**
  * // todo: this utils is heavy, it rely on quartz and corn-utils.
  * cron utils
  */
 public class CronUtils {
+
     private CronUtils() {
         throw new IllegalStateException("CronUtils class");
     }
 
     private static final Logger logger = LoggerFactory.getLogger(CronUtils.class);
 
-
-    private static final CronParser QUARTZ_CRON_PARSER = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(QUARTZ));
+    private static final CronParser QUARTZ_CRON_PARSER =
+            new CronParser(CronDefinitionBuilder.instanceDefinitionFor(QUARTZ));
 
     /**
      * parse to cron
@@ -94,7 +90,8 @@ public class CronUtils {
      * @return CycleEnum
      */
     public static CycleEnum getMaxCycle(Cron cron) {
-        return min(cron).addCycle(hour(cron)).addCycle(day(cron)).addCycle(week(cron)).addCycle(month(cron)).addCycle(year(cron)).getCycle();
+        return min(cron).addCycle(hour(cron)).addCycle(day(cron)).addCycle(week(cron)).addCycle(month(cron))
+                .addCycle(year(cron)).getCycle();
     }
 
     /**
@@ -105,11 +102,11 @@ public class CronUtils {
      */
     public static CycleEnum getMiniCycle(Cron cron) {
         return min(cron).addCycle(hour(cron))
-            .addCycle(day(cron))
-            .addCycle(week(cron))
-            .addCycle(month(cron))
-            .addCycle(year(cron))
-            .getMiniCycle();
+                .addCycle(day(cron))
+                .addCycle(week(cron))
+                .addCycle(month(cron))
+                .addCycle(year(cron))
+                .getMiniCycle();
     }
 
     /**
@@ -126,7 +123,6 @@ public class CronUtils {
         }
     }
 
-
     public static List<ZonedDateTime> getFireDateList(@NonNull ZonedDateTime startTime,
                                                       @NonNull ZonedDateTime endTime,
                                                       @NonNull String cron) throws CronParseException {
@@ -147,7 +143,7 @@ public class CronUtils {
         List<ZonedDateTime> dateList = new ArrayList<>();
         ExecutionTime executionTime = ExecutionTime.forCron(cron);
 
-        while (Stopper.isRunning()) {
+        while (!ServerLifeCycleManager.isStopped()) {
             Optional<ZonedDateTime> nextExecutionTimeOptional = executionTime.nextExecution(startTime);
             if (!nextExecutionTimeOptional.isPresent()) {
                 break;
@@ -198,8 +194,8 @@ public class CronUtils {
         ZonedDateTime zonedDateTimeEnd = ZonedDateTime.ofInstant(endTime.toInstant(), ZoneId.systemDefault());
 
         return getSelfFireDateList(zonedDateTimeStart, zonedDateTimeEnd, schedules).stream()
-            .map(zonedDateTime -> new Date(zonedDateTime.toInstant().toEpochMilli()))
-            .collect(Collectors.toList());
+                .map(zonedDateTime -> new Date(zonedDateTime.toInstant().toEpochMilli()))
+                .collect(Collectors.toList());
     }
 
     /**
@@ -208,8 +204,7 @@ public class CronUtils {
      */
     public static List<ZonedDateTime> getSelfFireDateList(@NonNull final ZonedDateTime startTime,
                                                           @NonNull final ZonedDateTime endTime,
-                                                          @NonNull final List<Schedule> schedules)
-        throws CronParseException {
+                                                          @NonNull final List<Schedule> schedules) throws CronParseException {
         List<ZonedDateTime> result = new ArrayList<>();
         if (startTime.equals(endTime)) {
             result.add(startTime);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
index 46c10c15d5..a6c6218fee 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
@@ -17,19 +17,8 @@
 
 package org.apache.dolphinscheduler.service.registry;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.dolphinscheduler.common.Constants.ADD_OP;
-import static org.apache.dolphinscheduler.common.Constants.COLON;
-import static org.apache.dolphinscheduler.common.Constants.DELETE_OP;
-import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
-import static org.apache.dolphinscheduler.common.Constants.MASTER_TYPE;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
-import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
-import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
-import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE;
-
+import com.google.common.base.Strings;
+import lombok.NonNull;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.NodeType;
@@ -40,7 +29,11 @@ import org.apache.dolphinscheduler.registry.api.ConnectionListener;
 import org.apache.dolphinscheduler.registry.api.Registry;
 import org.apache.dolphinscheduler.registry.api.RegistryException;
 import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -52,16 +45,16 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import javax.annotation.PostConstruct;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
-import com.google.common.base.Strings;
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.dolphinscheduler.common.Constants.COLON;
+import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
 
 @Component
 public class RegistryClient {
+
     private static final Logger logger = LoggerFactory.getLogger(RegistryClient.class);
 
     private static final String EMPTY = "";
@@ -78,6 +71,10 @@ public class RegistryClient {
         initNodes();
     }
 
+    public void connectUntilTimeout(@NonNull Duration duration) throws RegistryException {
+        registry.connectUntilTimeout(duration);
+    }
+
     public int getActiveMasterNum() {
         Collection<String> childrenList = new ArrayList<>();
         try {
@@ -142,33 +139,8 @@ public class RegistryClient {
 
     public boolean checkNodeExists(String host, NodeType nodeType) {
         return getServerMaps(nodeType, true).keySet()
-                                            .stream()
-                                            .anyMatch(it -> it.contains(host));
-    }
-
-    public void handleDeadServer(Collection<String> nodes, NodeType nodeType, String opType) {
-        nodes.forEach(node -> {
-            final String host = getHostByEventDataPath(node);
-            final String type = nodeType == NodeType.MASTER ? MASTER_TYPE : WORKER_TYPE;
-
-            if (opType.equals(DELETE_OP)) {
-                removeDeadServerByHost(host, type);
-            } else if (opType.equals(ADD_OP)) {
-                String deadServerPath = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + type + UNDERLINE + host;
-                // Add dead server info to zk dead server path : /dead-servers/
-                registry.put(deadServerPath, type + UNDERLINE + host, false);
-                logger.info("{} server dead , and {} added to zk dead server path success", nodeType, node);
-            }
-        });
-    }
-
-    public boolean checkIsDeadServer(String node, String serverType) {
-        // ip_sequence_no
-        String[] zNodesPath = node.split("/");
-        String ipSeqNo = zNodesPath[zNodesPath.length - 1];
-        String deadServerPath = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + serverType + UNDERLINE + ipSeqNo;
-
-        return !exists(node) || exists(deadServerPath);
+                .stream()
+                .anyMatch(it -> it.contains(host));
     }
 
     public Collection<String> getMasterNodesDirectly() {
@@ -271,7 +243,6 @@ public class RegistryClient {
     private void initNodes() {
         registry.put(REGISTRY_DOLPHINSCHEDULER_MASTERS, EMPTY, false);
         registry.put(REGISTRY_DOLPHINSCHEDULER_WORKERS, EMPTY, false);
-        registry.put(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, EMPTY, false);
     }
 
     private String rootNodePath(NodeType type) {
@@ -280,8 +251,6 @@ public class RegistryClient {
                 return Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
             case WORKER:
                 return Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
-            case DEAD_SERVER:
-                return Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
             default:
                 throw new IllegalStateException("Should not reach here");
         }
@@ -293,21 +262,9 @@ public class RegistryClient {
         if (nodeType != NodeType.WORKER) {
             return serverList;
         }
-        return serverList.stream().flatMap(group ->
-            getChildrenKeys(path + SINGLE_SLASH + group)
+        return serverList.stream().flatMap(group -> getChildrenKeys(path + SINGLE_SLASH + group)
                 .stream()
-                .map(it -> group + SINGLE_SLASH + it)
-        ).collect(Collectors.toList());
+                .map(it -> group + SINGLE_SLASH + it)).collect(Collectors.toList());
     }
 
-    private void removeDeadServerByHost(String host, String serverType) {
-        Collection<String> deadServers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS);
-        for (String serverPath : deadServers) {
-            if (serverPath.startsWith(serverType + UNDERLINE + host)) {
-                String server = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + serverPath;
-                remove(server);
-                logger.info("{} server {} deleted from zk dead server path success", serverType, host);
-            }
-        }
-    }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 6b4b27a46e..f29f93eed3 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -17,9 +17,10 @@
 
 package org.apache.dolphinscheduler.server.worker;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
-import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
@@ -33,13 +34,6 @@ import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
-
-import org.apache.commons.collections4.CollectionUtils;
-
-import java.util.Collection;
-
-import javax.annotation.PostConstruct;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -49,16 +43,17 @@ import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.FilterType;
 import org.springframework.transaction.annotation.EnableTransactionManagement;
 
+import javax.annotation.PostConstruct;
+import java.util.Collection;
+
 @SpringBootApplication
 @EnableTransactionManagement
-@ComponentScan(basePackages = "org.apache.dolphinscheduler",
-        excludeFilters = {
-                @ComponentScan.Filter(type = FilterType.REGEX, pattern = {
-                        "org.apache.dolphinscheduler.service.process.*",
-                        "org.apache.dolphinscheduler.service.queue.*",
-                })
-        }
-)
+@ComponentScan(basePackages = "org.apache.dolphinscheduler", excludeFilters = {
+        @ComponentScan.Filter(type = FilterType.REGEX, pattern = {
+                "org.apache.dolphinscheduler.service.process.*",
+                "org.apache.dolphinscheduler.service.queue.*",
+        })
+})
 public class WorkerServer implements IStoppable {
 
     /**
@@ -116,9 +111,8 @@ public class WorkerServer implements IStoppable {
         this.workerRpcClient.start();
         this.taskPluginManager.loadPlugin();
 
-        this.workerRegistryClient.registry();
         this.workerRegistryClient.setRegistryStoppable(this);
-        this.workerRegistryClient.handleDeadServer();
+        this.workerRegistryClient.start();
 
         this.workerManagerThread.start();
 
@@ -128,23 +122,24 @@ public class WorkerServer implements IStoppable {
          * registry hooks, which are called before the process exits
          */
         Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-            if (Stopper.isRunning()) {
+            if (!ServerLifeCycleManager.isStopped()) {
                 close("WorkerServer shutdown hook");
             }
         }));
     }
 
     public void close(String cause) {
-        if (!Stopper.stop()) {
+        if (!ServerLifeCycleManager.toStopped()) {
             logger.warn("WorkerServer is already stopped, current cause: {}", cause);
             return;
         }
         ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
 
-        try (WorkerRpcServer closedWorkerRpcServer = workerRpcServer;
-             WorkerRegistryClient closedRegistryClient = workerRegistryClient;
-             AlertClientService closedAlertClientService = alertClientService;
-             SpringApplicationContext closedSpringContext = springApplicationContext;) {
+        try (
+                WorkerRpcServer closedWorkerRpcServer = workerRpcServer;
+                WorkerRegistryClient closedRegistryClient = workerRegistryClient;
+                AlertClientService closedAlertClientService = alertClientService;
+                SpringApplicationContext closedSpringContext = springApplicationContext;) {
             logger.info("Worker server is stopping, current cause : {}", cause);
             // kill running tasks
             this.killAllRunningTasks();
@@ -173,7 +168,8 @@ public class WorkerServer implements IStoppable {
         for (TaskExecutionContext taskRequest : taskRequests) {
             // kill task when it's not finished yet
             try {
-                LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(), taskRequest.getTaskInstanceId());
+                LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(),
+                        taskRequest.getTaskInstanceId());
                 if (ProcessUtils.kill(taskRequest)) {
                     killNumber++;
                 }
@@ -181,6 +177,7 @@ public class WorkerServer implements IStoppable {
                 LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
             }
         }
-        logger.info("Worker after kill all cache task, task size: {}, killed number: {}", taskRequests.size(), killNumber);
+        logger.info("Worker after kill all cache task, task size: {}, killed number: {}", taskRequests.size(),
+                killNumber);
     }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 2367975715..fa1db46121 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -17,33 +17,32 @@
 
 package org.apache.dolphinscheduler.server.worker.config;
 
+import com.google.common.collect.Sets;
+import lombok.Data;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
-
-import java.time.Duration;
-import java.util.Set;
-
+import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.validation.Errors;
 import org.springframework.validation.Validator;
 import org.springframework.validation.annotation.Validated;
 
-import com.google.common.collect.Sets;
-
-import lombok.Data;
+import java.time.Duration;
+import java.util.Set;
 
 @Data
 @Validated
 @Configuration
 @ConfigurationProperties(prefix = "worker")
 public class WorkerConfig implements Validator {
+
+    private Logger logger = LoggerFactory.getLogger(WorkerConfig.class);
+
     private int listenPort = 1234;
     private int execThreads = 10;
     private Duration heartbeatInterval = Duration.ofSeconds(10);
-    /**
-     * Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close.
-     */
-    private int heartbeatErrorThreshold = 5;
     private int hostWeight = 100;
     private boolean tenantAutoCreate = true;
     private boolean tenantDistributedUser = false;
@@ -52,6 +51,8 @@ public class WorkerConfig implements Validator {
     private Set<String> groups = Sets.newHashSet("default");
     private String alertListenHost = "localhost";
     private int alertListenPort = 50052;
+    private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties();
+
     /**
      * This field doesn't need to set at config file, it will be calculated by workerIp:listenPort
      */
@@ -74,9 +75,23 @@ public class WorkerConfig implements Validator {
         if (workerConfig.getMaxCpuLoadAvg() <= 0) {
             workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
         }
-        if (workerConfig.getHeartbeatErrorThreshold() <= 0) {
-            errors.rejectValue("heartbeat-error-threshold", null, "should be a positive value");
-        }
         workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort()));
+        printConfig();
+    }
+
+    private void printConfig() {
+        logger.info("Worker config: listenPort -> {}", listenPort);
+        logger.info("Worker config: execThreads -> {}", execThreads);
+        logger.info("Worker config: heartbeatInterval -> {}", heartbeatInterval);
+        logger.info("Worker config: hostWeight -> {}", hostWeight);
+        logger.info("Worker config: tenantAutoCreate -> {}", tenantAutoCreate);
+        logger.info("Worker config: tenantDistributedUser -> {}", tenantDistributedUser);
+        logger.info("Worker config: maxCpuLoadAvg -> {}", maxCpuLoadAvg);
+        logger.info("Worker config: reservedMemory -> {}", reservedMemory);
+        logger.info("Worker config: groups -> {}", groups);
+        logger.info("Worker config: alertListenHost -> {}", alertListenHost);
+        logger.info("Worker config: alertListenPort -> {}", alertListenPort);
+        logger.info("Worker config: registryDisconnectStrategy -> {}", registryDisconnectStrategy);
+        logger.info("Worker config: workerAddress -> {}", registryDisconnectStrategy);
     }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
index 18dceb069b..a00c8d67a6 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
@@ -17,27 +17,24 @@
 
 package org.apache.dolphinscheduler.server.worker.message;
 
+import lombok.NonNull;
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.remote.command.BaseCommand;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.annotation.PostConstruct;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Component;
 
-import lombok.NonNull;
+import javax.annotation.PostConstruct;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 @Component
 public class MessageRetryRunner extends BaseDaemonThread {
@@ -75,7 +72,7 @@ public class MessageRetryRunner extends BaseDaemonThread {
 
     public void addRetryMessage(int taskInstanceId, @NonNull CommandType messageType, BaseCommand baseCommand) {
         needToRetryMessages.computeIfAbsent(taskInstanceId, k -> new ConcurrentHashMap<>()).put(messageType,
-                                                                                                baseCommand);
+                baseCommand);
     }
 
     public void removeRetryMessage(int taskInstanceId, @NonNull CommandType messageType) {
@@ -99,7 +96,7 @@ public class MessageRetryRunner extends BaseDaemonThread {
     }
 
     public void run() {
-        while (Stopper.isRunning()) {
+        while (!ServerLifeCycleManager.isStopped()) {
             try {
                 if (needToRetryMessages.isEmpty()) {
                     Thread.sleep(MESSAGE_RETRY_WINDOW);
@@ -136,4 +133,8 @@ public class MessageRetryRunner extends BaseDaemonThread {
             }
         }
     }
+
+    public void clearMessage() {
+        needToRetryMessages.clear();
+    }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectStrategy.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectStrategy.java
new file mode 100644
index 0000000000..260b7e0390
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectStrategy.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.registry;
+
+import org.apache.dolphinscheduler.registry.api.ConnectStrategy;
+
+public interface WorkerConnectStrategy extends ConnectStrategy {
+
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListener.java
similarity index 58%
copy from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
copy to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListener.java
index d0b86fc660..36b95e1b62 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListener.java
@@ -15,47 +15,46 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.registry;
+package org.apache.dolphinscheduler.server.worker.registry;
 
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.registry.api.ConnectionListener;
 import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import lombok.NonNull;
-
-public class MasterConnectionStateListener implements ConnectionListener {
-
-    private static final Logger logger = LoggerFactory.getLogger(MasterConnectionStateListener.class);
+public class WorkerConnectionStateListener implements ConnectionListener {
 
-    private final String masterNodePath;
+    private final Logger logger = LoggerFactory.getLogger(WorkerConnectionStateListener.class);
+    private final WorkerConfig workerConfig;
     private final RegistryClient registryClient;
+    private final WorkerConnectStrategy workerConnectStrategy;
 
-    public MasterConnectionStateListener(@NonNull String masterNodePath, @NonNull RegistryClient registryClient) {
-        this.masterNodePath = masterNodePath;
+    public WorkerConnectionStateListener(@NonNull WorkerConfig workerConfig,
+                                         @NonNull RegistryClient registryClient,
+                                         @NonNull WorkerConnectStrategy workerConnectStrategy) {
+        this.workerConfig = workerConfig;
         this.registryClient = registryClient;
+        this.workerConnectStrategy = workerConnectStrategy;
     }
 
     @Override
     public void onUpdate(ConnectionState state) {
+        logger.info("Worker received a {} event from registry, the current server state is {}", state,
+                ServerLifeCycleManager.getServerStatus());
         switch (state) {
             case CONNECTED:
-                logger.debug("registry connection state is {}", state);
                 break;
             case SUSPENDED:
-                logger.warn("registry connection state is {}, ready to retry connection", state);
                 break;
             case RECONNECTED:
-                logger.debug("registry connection state is {}, clean the node info", state);
-                registryClient.remove(masterNodePath);
-                registryClient.persistEphemeral(masterNodePath, "");
+                workerConnectStrategy.reconnect();
                 break;
             case DISCONNECTED:
-                logger.warn("registry connection state is {}, ready to stop myself", state);
-                registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself");
-                break;
+                workerConnectStrategy.disconnect();
             default:
         }
     }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index 96d41c38c0..225439b33d 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -17,22 +17,25 @@
 
 package org.apache.dolphinscheduler.server.worker.registry;
 
-import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
-import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
-import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
-
+import com.google.common.base.Strings;
+import com.google.common.collect.Sets;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
+import javax.annotation.PostConstruct;
 import java.io.IOException;
 import java.util.Set;
 import java.util.StringJoiner;
@@ -40,15 +43,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import javax.annotation.PostConstruct;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.Sets;
+import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
 
 /**
  * worker registry
@@ -78,6 +76,9 @@ public class WorkerRegistryClient implements AutoCloseable {
     @Autowired
     private RegistryClient registryClient;
 
+    @Autowired
+    private WorkerConnectStrategy workerConnectStrategy;
+
     /**
      * worker startup time, ms
      */
@@ -89,27 +90,36 @@ public class WorkerRegistryClient implements AutoCloseable {
     public void initWorkRegistry() {
         this.workerGroups = workerConfig.getGroups();
         this.startupTime = System.currentTimeMillis();
-        this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
+        this.heartBeatExecutor =
+                Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
+    }
+
+    public void start() {
+        try {
+            registry();
+            registryClient.addConnectionStateListener(
+                    new WorkerConnectionStateListener(workerConfig, registryClient, workerConnectStrategy));
+        } catch (Exception ex) {
+            throw new RegistryException("Worker registry client start up error", ex);
+        }
     }
 
     /**
      * registry
      */
-    public void registry() {
+    private void registry() {
         String address = NetUtils.getAddr(workerConfig.getListenPort());
         Set<String> workerZkPaths = getWorkerZkPaths();
         long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds();
 
         HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
-                                                        workerConfig.getMaxCpuLoadAvg(),
-                                                        workerConfig.getReservedMemory(),
-                                                        workerConfig.getHostWeight(),
-                                                        workerZkPaths,
-                                                        Constants.WORKER_TYPE,
-                                                        registryClient,
-                                                        workerConfig.getExecThreads(),
-                                                        workerManagerThread.getThreadPoolQueueSize(),
-                                                        workerConfig.getHeartbeatErrorThreshold());
+                workerConfig.getMaxCpuLoadAvg(),
+                workerConfig.getReservedMemory(),
+                workerConfig.getHostWeight(),
+                workerZkPaths,
+                registryClient,
+                workerConfig.getExecThreads(),
+                workerManagerThread.getThreadPoolQueueSize());
 
         for (String workerZKPath : workerZkPaths) {
             // remove before persist
@@ -125,37 +135,11 @@ public class WorkerRegistryClient implements AutoCloseable {
         // sleep 1s, waiting master failover remove
         ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
 
-        // delete dead server
-        registryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
-
-        this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
+        this.heartBeatExecutor.scheduleWithFixedDelay(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval,
+                TimeUnit.SECONDS);
         logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval);
     }
 
-    /**
-     * remove registry info
-     */
-    public void unRegistry() throws IOException {
-        try {
-            String address = getLocalAddress();
-            Set<String> workerZkPaths = getWorkerZkPaths();
-            for (String workerZkPath : workerZkPaths) {
-                registryClient.remove(workerZkPath);
-                logger.info("worker node : {} unRegistry from ZK {}.", address, workerZkPath);
-            }
-        } catch (Exception ex) {
-            logger.error("remove worker zk path exception", ex);
-        }
-
-        if (heartBeatExecutor != null) {
-            heartBeatExecutor.shutdownNow();
-            logger.info("Heartbeat executor shutdown");
-        }
-
-        registryClient.close();
-        logger.info("registry client closed");
-    }
-
     /**
      * get worker path
      */
@@ -177,11 +161,6 @@ public class WorkerRegistryClient implements AutoCloseable {
         return workerPaths;
     }
 
-    public void handleDeadServer() {
-        Set<String> workerZkPaths = getWorkerZkPaths();
-        registryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
-    }
-
     /**
      * get local address
      */
@@ -195,7 +174,12 @@ public class WorkerRegistryClient implements AutoCloseable {
 
     @Override
     public void close() throws IOException {
-        unRegistry();
+        if (heartBeatExecutor != null) {
+            heartBeatExecutor.shutdownNow();
+            logger.info("Heartbeat executor shutdown");
+        }
+        registryClient.close();
+        logger.info("registry client closed");
     }
 
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java
new file mode 100644
index 0000000000..08d434ea2d
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.registry;
+
+import org.apache.dolphinscheduler.registry.api.StrategyType;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Service;
+
+@Service
+@ConditionalOnProperty(prefix = "worker.registry-disconnect-strategy", name = "strategy", havingValue = "stop", matchIfMissing = true)
+public class WorkerStopStrategy implements WorkerConnectStrategy {
+
+    private final Logger logger = LoggerFactory.getLogger(WorkerStopStrategy.class);
+
+    @Autowired
+    public RegistryClient registryClient;
+    @Autowired
+    private WorkerConfig workerConfig;
+
+    @Override
+    public void disconnect() {
+        registryClient.getStoppable()
+                .stop("Worker disconnected from registry, will stop myself due to the stop strategy");
+    }
+
+    @Override
+    public void reconnect() {
+        logger.warn("The current connect strategy is stop, so the worker will not reconnect to registry");
+    }
+
+    @Override
+    public StrategyType getStrategyType() {
+        return StrategyType.STOP;
+    }
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
new file mode 100644
index 0000000000..203e5711f1
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.registry;
+
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+import org.apache.dolphinscheduler.registry.api.StrategyType;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Service;
+
+import java.time.Duration;
+
+@Service
+@ConditionalOnProperty(prefix = "worker.registry-disconnect-strategy", name = "strategy", havingValue = "waiting")
+public class WorkerWaitingStrategy implements WorkerConnectStrategy {
+
+    private final Logger logger = LoggerFactory.getLogger(WorkerWaitingStrategy.class);
+
+    @Autowired
+    private WorkerConfig workerConfig;
+
+    @Autowired
+    private RegistryClient registryClient;
+
+    @Autowired
+    private WorkerRpcServer workerRpcServer;
+
+    @Autowired
+    private WorkerRpcClient workerRpcClient;
+
+    @Autowired
+    private MessageRetryRunner messageRetryRunner;
+
+    @Autowired
+    private WorkerManagerThread workerManagerThread;
+
+    @Override
+    public void disconnect() {
+        try {
+            ServerLifeCycleManager.toWaiting();
+            clearWorkerResource();
+            Duration maxWaitingTime = workerConfig.getRegistryDisconnectStrategy().getMaxWaitingTime();
+            try {
+                logger.info("Worker disconnect from registry will try to reconnect in {} s",
+                        maxWaitingTime.getSeconds());
+                registryClient.connectUntilTimeout(maxWaitingTime);
+            } catch (RegistryException ex) {
+                throw new ServerLifeCycleException(
+                        String.format("Waiting to reconnect to registry in %s failed", maxWaitingTime), ex);
+            }
+        } catch (ServerLifeCycleException e) {
+            String errorMessage = String.format(
+                    "Disconnect from registry and change the current status to waiting error, the current server state is %s, will stop the current server",
+                    ServerLifeCycleManager.getServerStatus());
+            logger.error(errorMessage, e);
+            registryClient.getStoppable().stop(errorMessage);
+        } catch (RegistryException ex) {
+            String errorMessage = "Disconnect from registry and waiting to reconnect failed, will stop the server";
+            logger.error(errorMessage, ex);
+            registryClient.getStoppable().stop(errorMessage);
+        } catch (Exception ex) {
+            String errorMessage = "Disconnect from registry and get an unknown exception, will stop the server";
+            logger.error(errorMessage, ex);
+            registryClient.getStoppable().stop(errorMessage);
+        }
+    }
+
+    @Override
+    public void reconnect() {
+        try {
+            ServerLifeCycleManager.recoverFromWaiting();
+            reStartWorkerResource();
+            logger.info("Recover from waiting success, the current server status is {}",
+                    ServerLifeCycleManager.getServerStatus());
+        } catch (Exception e) {
+            String errorMessage =
+                    String.format("Recover from waiting failed, the current server status is %s, will stop the server",
+                            ServerLifeCycleManager.getServerStatus());
+            logger.error(errorMessage, e);
+            registryClient.getStoppable().stop(errorMessage);
+        }
+
+    }
+
+    @Override
+    public StrategyType getStrategyType() {
+        return StrategyType.WAITING;
+    }
+
+    private void clearWorkerResource() {
+        // close the worker resource, if close failed should stop the worker server
+        workerRpcServer.close();
+        logger.warn("Worker server close the RPC server due to lost connection from registry");
+        workerRpcClient.close();
+        logger.warn("Worker server close the RPC client due to lost connection from registry");
+        workerManagerThread.clearTask();
+        logger.warn("Worker server clear the tasks due to lost connection from registry");
+        messageRetryRunner.clearMessage();
+        logger.warn("Worker server clear the retry message due to lost connection from registry");
+
+    }
+
+    private void reStartWorkerResource() {
+        // reopen the resource, if reopen failed should stop the worker server
+        workerRpcServer.start();
+        logger.warn("Worker server restart PRC server due to reconnect to registry");
+        workerRpcClient.start();
+        logger.warn("Worker server restart PRC client due to reconnect to registry");
+    }
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
index 98de315345..46ecfd0d5b 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.runner;
 
 import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
 
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -33,6 +34,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
 public class WorkerExecService {
+
     /**
      * logger of WorkerExecService
      */
@@ -50,7 +52,8 @@ public class WorkerExecService {
      */
     private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap;
 
-    public WorkerExecService(ExecutorService execService, ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap) {
+    public WorkerExecService(ExecutorService execService,
+                             ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap) {
         this.execService = execService;
         this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService);
         this.taskExecuteThreadMap = taskExecuteThreadMap;
@@ -61,6 +64,7 @@ public class WorkerExecService {
         taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread);
         ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread);
         FutureCallback futureCallback = new FutureCallback() {
+
             @Override
             public void onSuccess(Object o) {
                 taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
@@ -69,9 +73,9 @@ public class WorkerExecService {
             @Override
             public void onFailure(Throwable throwable) {
                 logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}",
-                             taskExecuteThread.getTaskExecutionContext().getProcessInstanceId(),
-                             taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(),
-                             throwable);
+                        taskExecuteThread.getTaskExecutionContext().getProcessInstanceId(),
+                        taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(),
+                        throwable);
                 taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
             }
         };
@@ -87,4 +91,8 @@ public class WorkerExecService {
         return ((ThreadPoolExecutor) this.execService).getQueue().size();
     }
 
-} 
\ No newline at end of file
+    public Map<Integer, TaskExecuteThread> getTaskExecuteThreadMap() {
+        return taskExecuteThreadMap;
+    }
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 8b6365172a..2466700da1 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -18,22 +18,21 @@
 package org.apache.dolphinscheduler.server.worker.runner;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.storage.StorageOperate;
-import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.DelayQueue;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+
 /**
  * Manage tasks
  */
@@ -72,9 +71,8 @@ public class WorkerManagerThread implements Runnable {
         workerExecThreads = workerConfig.getExecThreads();
         this.waitSubmitQueue = new DelayQueue<>();
         workerExecService = new WorkerExecService(
-            ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()),
-            taskExecuteThreadMap
-        );
+                ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()),
+                taskExecuteThreadMap);
     }
 
     public TaskExecuteThread getTaskExecuteThread(Integer taskInstanceId) {
@@ -105,11 +103,11 @@ public class WorkerManagerThread implements Runnable {
      */
     public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
         waitSubmitQueue.stream()
-                          .filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == taskInstanceId)
-                          .forEach(waitSubmitQueue::remove);
+                .filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext()
+                        .getTaskInstanceId() == taskInstanceId)
+                .forEach(waitSubmitQueue::remove);
     }
 
-
     /**
      * submit task
      *
@@ -140,8 +138,11 @@ public class WorkerManagerThread implements Runnable {
     public void run() {
         Thread.currentThread().setName("Worker-Execute-Manager-Thread");
         TaskExecuteThread taskExecuteThread;
-        while (Stopper.isRunning()) {
+        while (!ServerLifeCycleManager.isStopped()) {
             try {
+                if (!ServerLifeCycleManager.isRunning()) {
+                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+                }
                 if (this.getThreadPoolQueueSize() <= workerExecThreads) {
                     taskExecuteThread = waitSubmitQueue.take();
                     workerExecService.submit(taskExecuteThread);
@@ -153,8 +154,14 @@ public class WorkerManagerThread implements Runnable {
                 }
             } catch (Exception e) {
                 logger.error("An unexpected interrupt is happened, "
-                    + "the exception will be ignored and this thread will continue to run", e);
+                        + "the exception will be ignored and this thread will continue to run", e);
             }
         }
     }
+
+    public void clearTask() {
+        waitSubmitQueue.clear();
+        workerExecService.getTaskExecuteThreadMap().values().forEach(TaskExecuteThread::kill);
+        workerExecService.getTaskExecuteThreadMap().clear();
+    }
 }
diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml b/dolphinscheduler-worker/src/main/resources/application.yaml
index a9c3eadf3c..da003e9a4a 100644
--- a/dolphinscheduler-worker/src/main/resources/application.yaml
+++ b/dolphinscheduler-worker/src/main/resources/application.yaml
@@ -60,8 +60,6 @@ worker:
   exec-threads: 100
   # worker heartbeat interval
   heartbeat-interval: 10s
-  # Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close.
-  heartbeat-error-threshold: 5
   # worker host weight to dispatch tasks, default value 100
   host-weight: 100
   # tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true.
@@ -78,6 +76,11 @@ worker:
   # alert server listen host
   alert-listen-host: localhost
   alert-listen-port: 50052
+  registry-disconnect-strategy:
+    # The disconnect strategy: stop, waiting
+    strategy: waiting
+    # The max waiting time to reconnect to registry if you set the strategy to waiting
+    max-waiting-time: 100s
 
 server:
   port: 1235
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
index 4bd2161e83..1d8e7ea516 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
@@ -68,8 +68,11 @@ public class WorkerRegistryClientTest {
 
     @Mock
     private WorkerManagerThread workerManagerThread;
-    
-    //private static final Set<String> workerGroups;
+
+    @Mock
+    private WorkerConnectStrategy workerConnectStrategy;
+
+    // private static final Set<String> workerGroups;
 
     static {
         // workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP);
@@ -78,14 +81,15 @@ public class WorkerRegistryClientTest {
     @Before
     public void before() {
         given(workerConfig.getGroups()).willReturn(Sets.newHashSet("127.0.0.1"));
-        //given(heartBeatExecutor.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1"));
-        //scheduleAtFixedRate
-        given(heartBeatExecutor.scheduleAtFixedRate(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(TimeUnit.class))).willReturn(null);
+        // given(heartBeatExecutor.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1"));
+        // scheduleAtFixedRate
+        given(heartBeatExecutor.scheduleAtFixedRate(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(),
+                Mockito.any(TimeUnit.class))).willReturn(null);
 
     }
 
     @Test
-    public void testRegistry() {
+    public void testStart() {
         workerRegistryClient.initWorkRegistry();
 
         given(workerManagerThread.getThreadPoolQueueSize()).willReturn(1);
@@ -94,9 +98,8 @@ public class WorkerRegistryClientTest {
 
         given(workerConfig.getHeartbeatInterval()).willReturn(Duration.ofSeconds(1));
 
-        workerRegistryClient.registry();
+        workerRegistryClient.start();
 
-        Mockito.verify(registryClient, Mockito.times(1)).handleDeadServer(Mockito.anyCollection(), Mockito.any(NodeType.class), Mockito.anyString());
     }
 
     @Test