You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/02/18 11:27:21 UTC

[incubator-dolphinscheduler] branch dev updated: [Improvement-3369][api] Introduce monitor, processinstance and queue service interface for clear code (#4765)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new dc55b5b  [Improvement-3369][api] Introduce monitor, processinstance and queue service interface for clear code (#4765)
dc55b5b is described below

commit dc55b5ba6faffc3587ed0a2b78b2ceaf891844f5
Author: Shiwen Cheng <ch...@gmail.com>
AuthorDate: Thu Feb 18 19:27:13 2021 +0800

    [Improvement-3369][api] Introduce monitor, processinstance and queue service interface for clear code (#4765)
    
    * [Improvement-3369][api] Introduce monitor, processinstance and queue service interface for clear code
---
 .../api/service/MonitorService.java                | 168 ++----
 .../api/service/ProcessInstanceService.java        | 610 +--------------------
 .../dolphinscheduler/api/service/QueueService.java | 229 +-------
 .../api/service/impl/MonitorServiceImpl.java       | 161 ++++++
 .../ProcessInstanceServiceImpl.java}               |  35 +-
 .../QueueServiceImpl.java}                         |  49 +-
 .../api/service/MonitorServiceTest.java            |   4 +-
 .../api/service/ProcessInstanceServiceTest.java    |   3 +-
 .../api/service/QueueServiceTest.java              |   3 +-
 .../apache/dolphinscheduler/common/Constants.java  |  12 +-
 .../server/registry/HeartBeatTask.java             |   9 +-
 11 files changed, 292 insertions(+), 991 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java
index e46ca6f..51cba2c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java
@@ -14,143 +14,51 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.api.service;
 
-import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.dao.entity.User;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor;
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.ZKNodeType;
-import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.model.WorkerServerModel;
-import org.apache.dolphinscheduler.dao.MonitorDBDao;
-import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
-import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-import com.google.common.collect.Sets;
 
 /**
  * monitor service
  */
-@Service
-public class MonitorService extends BaseService {
-
-  @Autowired
-  private ZookeeperMonitor zookeeperMonitor;
-
-  @Autowired
-  private MonitorDBDao monitorDBDao;
-  /**
-   * query database state
-   *
-   * @param loginUser login user
-   * @return data base state
-   */
-  public Map<String,Object> queryDatabaseState(User loginUser) {
-    Map<String, Object> result = new HashMap<>();
-
-    List<MonitorRecord> monitorRecordList = monitorDBDao.queryDatabaseState();
-
-    result.put(Constants.DATA_LIST, monitorRecordList);
-    putMsg(result, Status.SUCCESS);
-
-    return result;
-
-  }
-
-  /**
-   * query master list
-   *
-   * @param loginUser login user
-   * @return master information list
-   */
-  public Map<String,Object> queryMaster(User loginUser) {
-
-    Map<String, Object> result = new HashMap<>();
-
-    List<Server> masterServers = getServerListFromZK(true);
-    result.put(Constants.DATA_LIST, masterServers);
-    putMsg(result,Status.SUCCESS);
-
-    return result;
-  }
-
-  /**
-   * query zookeeper state
-   *
-   * @param loginUser login user
-   * @return zookeeper information list
-   */
-  public Map<String,Object> queryZookeeperState(User loginUser) {
-    Map<String, Object> result = new HashMap<>();
-
-    List<ZookeeperRecord> zookeeperRecordList = zookeeperMonitor.zookeeperInfoList();
-
-    result.put(Constants.DATA_LIST, zookeeperRecordList);
-    putMsg(result, Status.SUCCESS);
-
-    return result;
-
-  }
-
-
-  /**
-   * query worker list
-   *
-   * @param loginUser login user
-   * @return worker information list
-   */
-  public Map<String,Object> queryWorker(User loginUser) {
-
-    Map<String, Object> result = new HashMap<>();
-    List<WorkerServerModel> workerServers = getServerListFromZK(false)
-            .stream()
-            .map((Server server) -> {
-              WorkerServerModel model = new WorkerServerModel();
-              model.setId(server.getId());
-              model.setHost(server.getHost());
-              model.setPort(server.getPort());
-              model.setZkDirectories(Sets.newHashSet(server.getZkDirectory()));
-              model.setResInfo(server.getResInfo());
-              model.setCreateTime(server.getCreateTime());
-              model.setLastHeartbeatTime(server.getLastHeartbeatTime());
-              return model;
-            })
-            .collect(Collectors.toList());
-
-    Map<String, WorkerServerModel> workerHostPortServerMapping = workerServers
-            .stream()
-            .collect(Collectors.toMap(
-                    (WorkerServerModel worker) -> {
-                        String[] s = worker.getZkDirectories().iterator().next().split("/");
-                        return s[s.length - 1];
-                    }
-                    , Function.identity()
-                    , (WorkerServerModel oldOne, WorkerServerModel newOne) -> {
-                      oldOne.getZkDirectories().addAll(newOne.getZkDirectories());
-                      return oldOne;
-                    }));
-
-    result.put(Constants.DATA_LIST, workerHostPortServerMapping.values());
-    putMsg(result,Status.SUCCESS);
-
-    return result;
-  }
-
-  public List<Server> getServerListFromZK(boolean isMaster) {
-
-    checkNotNull(zookeeperMonitor);
-    ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER;
-    return zookeeperMonitor.getServersList(zkNodeType);
-  }
-
+public interface MonitorService {
+
+    /**
+     * query database state
+     *
+     * @param loginUser login user
+     * @return data base state
+     */
+    Map<String,Object> queryDatabaseState(User loginUser);
+    
+    /**
+     * query master list
+     *
+     * @param loginUser login user
+     * @return master information list
+     */
+    Map<String,Object> queryMaster(User loginUser);
+    
+    /**
+     * query zookeeper state
+     *
+     * @param loginUser login user
+     * @return zookeeper information list
+     */
+    Map<String,Object> queryZookeeperState(User loginUser);
+    
+    /**
+     * query worker list
+     *
+     * @param loginUser login user
+     * @return worker information list
+     */
+    Map<String,Object> queryWorker(User loginUser);
+    
+    List<Server> getServerListFromZK(boolean isMaster);
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index 6458a76..914eb2d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -17,157 +17,26 @@
 
 package org.apache.dolphinscheduler.api.service;
 
-import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
-import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
-import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
-import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
-
-import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
-import org.apache.dolphinscheduler.api.dto.gantt.Task;
-import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.utils.PageInfo;
-import org.apache.dolphinscheduler.api.utils.Result;
-import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.DependResult;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.common.graph.DAG;
-import org.apache.dolphinscheduler.common.model.TaskNode;
-import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
-import org.apache.dolphinscheduler.common.process.ProcessDag;
-import org.apache.dolphinscheduler.common.process.Property;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.ParameterUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessData;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
-import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
-import org.apache.dolphinscheduler.dao.utils.DagHelper;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 
 /**
  * process instance service
  */
-@Service
-public class ProcessInstanceService extends BaseService {
-
-
-    private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceService.class);
-
-    @Autowired
-    ProjectMapper projectMapper;
-
-    @Autowired
-    ProjectService projectService;
-
-    @Autowired
-    ProcessService processService;
-
-    @Autowired
-    ProcessInstanceMapper processInstanceMapper;
-
-    @Autowired
-    ProcessDefinitionMapper processDefineMapper;
-
-    @Autowired
-    ProcessDefinitionService processDefinitionService;
-
-    @Autowired
-    ProcessDefinitionVersionService processDefinitionVersionService;
-
-    @Autowired
-    ExecutorService execService;
-
-    @Autowired
-    TaskInstanceMapper taskInstanceMapper;
-
-    @Autowired
-    LoggerService loggerService;
-
-
-    @Autowired
-    UsersService usersService;
+public interface ProcessInstanceService {
 
     /**
      * return top n SUCCESS process instance order by running time which started between startTime and endTime
      */
-    public Map<String, Object> queryTopNLongestRunningProcessInstance(User loginUser, String projectName, int size, String startTime, String endTime) {
-        Map<String, Object> result = new HashMap<>();
-
-        Project project = projectMapper.queryByName(projectName);
-        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
-        Status resultEnum = (Status) checkResult.get(Constants.STATUS);
-        if (resultEnum != Status.SUCCESS) {
-            return checkResult;
-        }
-
-        if (0 > size) {
-            putMsg(result, Status.NEGTIVE_SIZE_NUMBER_ERROR, size);
-            return result;
-        }
-        if (Objects.isNull(startTime)) {
-            putMsg(result, Status.DATA_IS_NULL, Constants.START_TIME);
-            return result;
-        }
-        Date start = DateUtils.stringToDate(startTime);
-        if (Objects.isNull(endTime)) {
-            putMsg(result, Status.DATA_IS_NULL, Constants.END_TIME);
-            return result;
-        }
-        Date end = DateUtils.stringToDate(endTime);
-        if (start == null || end == null) {
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startDate,endDate");
-            return result;
-        }
-        if (start.getTime() > end.getTime()) {
-            putMsg(result, Status.START_TIME_BIGGER_THAN_END_TIME_ERROR, startTime, endTime);
-            return result;
-        }
-
-        List<ProcessInstance> processInstances = processInstanceMapper.queryTopNProcessInstance(size, start, end, ExecutionStatus.SUCCESS);
-        result.put(DATA_LIST, processInstances);
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
+    Map<String, Object> queryTopNLongestRunningProcessInstance(User loginUser, String projectName, int size, String startTime, String endTime);
 
     /**
      * query process instance by id
@@ -177,24 +46,7 @@ public class ProcessInstanceService extends BaseService {
      * @param processId process instance id
      * @return process instance detail
      */
-    public Map<String, Object> queryProcessInstanceById(User loginUser, String projectName, Integer processId) {
-        Map<String, Object> result = new HashMap<>();
-        Project project = projectMapper.queryByName(projectName);
-
-        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
-        Status resultEnum = (Status) checkResult.get(Constants.STATUS);
-        if (resultEnum != Status.SUCCESS) {
-            return checkResult;
-        }
-        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
-
-        ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
-        processInstance.setWarningGroupId(processDefinition.getWarningGroupId());
-        result.put(DATA_LIST, processInstance);
-        putMsg(result, Status.SUCCESS);
-
-        return result;
-    }
+    Map<String, Object> queryProcessInstanceById(User loginUser, String projectName, Integer processId);
 
     /**
      * paging query process instance list, filtering according to project, process definition, time range, keyword, process status
@@ -211,64 +63,10 @@ public class ProcessInstanceService extends BaseService {
      * @param endDate end time
      * @return process instance list
      */
-    public Map<String, Object> queryProcessInstanceList(User loginUser, String projectName, Integer processDefineId,
-                                                        String startDate, String endDate,
-                                                        String searchVal, String executorName, ExecutionStatus stateType, String host,
-                                                        Integer pageNo, Integer pageSize) {
-
-        Map<String, Object> result = new HashMap<>();
-        Project project = projectMapper.queryByName(projectName);
-
-        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
-        Status resultEnum = (Status) checkResult.get(Constants.STATUS);
-        if (resultEnum != Status.SUCCESS) {
-            return checkResult;
-        }
-
-        int[] statusArray = null;
-        // filter by state
-        if (stateType != null) {
-            statusArray = new int[]{stateType.ordinal()};
-        }
-
-        Date start = null;
-        Date end = null;
-        try {
-            if (StringUtils.isNotEmpty(startDate)) {
-                start = DateUtils.getScheduleDate(startDate);
-            }
-            if (StringUtils.isNotEmpty(endDate)) {
-                end = DateUtils.getScheduleDate(endDate);
-            }
-        } catch (Exception e) {
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startDate,endDate");
-            return result;
-        }
-
-        Page<ProcessInstance> page = new Page<>(pageNo, pageSize);
-        PageInfo pageInfo = new PageInfo<ProcessInstance>(pageNo, pageSize);
-        int executorId = usersService.getUserIdByName(executorName);
-
-        IPage<ProcessInstance> processInstanceList =
-                processInstanceMapper.queryProcessInstanceListPaging(page,
-                        project.getId(), processDefineId, searchVal, executorId, statusArray, host, start, end);
-
-        List<ProcessInstance> processInstances = processInstanceList.getRecords();
-
-        for (ProcessInstance processInstance : processInstances) {
-            processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime()));
-            User executor = usersService.queryUser(processInstance.getExecutorId());
-            if (null != executor) {
-                processInstance.setExecutorName(executor.getUserName());
-            }
-        }
-
-        pageInfo.setTotalCount((int) processInstanceList.getTotal());
-        pageInfo.setLists(processInstances);
-        result.put(DATA_LIST, pageInfo);
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
+    Map<String, Object> queryProcessInstanceList(User loginUser, String projectName, Integer processDefineId,
+                                                 String startDate, String endDate,
+                                                 String searchVal, String executorName, ExecutionStatus stateType, String host,
+                                                 Integer pageNo, Integer pageSize);
 
     /**
      * query task list by process instance id
@@ -279,71 +77,9 @@ public class ProcessInstanceService extends BaseService {
      * @return task list for the process instance
      * @throws IOException io exception
      */
-    public Map<String, Object> queryTaskListByProcessId(User loginUser, String projectName, Integer processId) throws IOException {
-        Map<String, Object> result = new HashMap<>();
-        Project project = projectMapper.queryByName(projectName);
-
-        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
-        Status resultEnum = (Status) checkResult.get(Constants.STATUS);
-        if (resultEnum != Status.SUCCESS) {
-            return checkResult;
-        }
-        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
-        List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processId);
-        addDependResultForTaskList(taskInstanceList);
-        Map<String, Object> resultMap = new HashMap<>();
-        resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString());
-        resultMap.put(TASK_LIST, taskInstanceList);
-        result.put(DATA_LIST, resultMap);
-
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
-
-    /**
-     * add dependent result for dependent task
-     */
-    private void addDependResultForTaskList(List<TaskInstance> taskInstanceList) throws IOException {
-        for (TaskInstance taskInstance : taskInstanceList) {
-            if (taskInstance.getTaskType().equalsIgnoreCase(TaskType.DEPENDENT.toString())) {
-                Result<String> logResult = loggerService.queryLog(
-                        taskInstance.getId(), 0, 4098);
-                if (logResult.getCode() == Status.SUCCESS.ordinal()) {
-                    String log = logResult.getData();
-                    Map<String, DependResult> resultMap = parseLogForDependentResult(log);
-                    taskInstance.setDependentResult(JSONUtils.toJsonString(resultMap));
-                }
-            }
-        }
-    }
+    Map<String, Object> queryTaskListByProcessId(User loginUser, String projectName, Integer processId) throws IOException;
 
-    public Map<String, DependResult> parseLogForDependentResult(String log) throws IOException {
-        Map<String, DependResult> resultMap = new HashMap<>();
-        if (StringUtils.isEmpty(log)) {
-            return resultMap;
-        }
-
-        BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(log.getBytes(
-                StandardCharsets.UTF_8)), StandardCharsets.UTF_8));
-        String line;
-        while ((line = br.readLine()) != null) {
-            if (line.contains(DEPENDENT_SPLIT)) {
-                String[] tmpStringArray = line.split(":\\|\\|");
-                if (tmpStringArray.length != 2) {
-                    continue;
-                }
-                String dependResultString = tmpStringArray[1];
-                String[] dependStringArray = dependResultString.split(",");
-                if (dependStringArray.length != 2) {
-                    continue;
-                }
-                String key = dependStringArray[0].trim();
-                DependResult dependResult = DependResult.valueOf(dependStringArray[1].trim());
-                resultMap.put(key, dependResult);
-            }
-        }
-        return resultMap;
-    }
+    Map<String, DependResult> parseLogForDependentResult(String log) throws IOException;
 
     /**
      * query sub process instance detail info by task id
@@ -353,38 +89,7 @@ public class ProcessInstanceService extends BaseService {
      * @param taskId task id
      * @return sub process instance detail
      */
-    public Map<String, Object> querySubProcessInstanceByTaskId(User loginUser, String projectName, Integer taskId) {
-        Map<String, Object> result = new HashMap<>();
-        Project project = projectMapper.queryByName(projectName);
-
-        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
-        Status resultEnum = (Status) checkResult.get(Constants.STATUS);
-        if (resultEnum != Status.SUCCESS) {
-            return checkResult;
-        }
-
-        TaskInstance taskInstance = processService.findTaskInstanceById(taskId);
-        if (taskInstance == null) {
-            putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
-            return result;
-        }
-        if (!taskInstance.isSubProcess()) {
-            putMsg(result, Status.TASK_INSTANCE_NOT_SUB_WORKFLOW_INSTANCE, taskInstance.getName());
-            return result;
-        }
-
-        ProcessInstance subWorkflowInstance = processService.findSubProcessInstance(
-                taskInstance.getProcessInstanceId(), taskInstance.getId());
-        if (subWorkflowInstance == null) {
-            putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId);
-            return result;
-        }
-        Map<String, Object> dataMap = new HashMap<>();
-        dataMap.put("subProcessInstanceId", subWorkflowInstance.getId());
-        result.put(DATA_LIST, dataMap);
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
+    Map<String, Object> querySubProcessInstanceByTaskId(User loginUser, String projectName, Integer taskId);
 
     /**
      * update process instance
@@ -401,97 +106,9 @@ public class ProcessInstanceService extends BaseService {
      * @return update result code
      * @throws ParseException parse exception for json parse
      */
-    public Map<String, Object> updateProcessInstance(User loginUser, String projectName, Integer processInstanceId,
-                                                     String processInstanceJson, String scheduleTime, Boolean syncDefine,
-                                                     Flag flag, String locations, String connects) throws ParseException {
-        Map<String, Object> result = new HashMap<>();
-        Project project = projectMapper.queryByName(projectName);
-
-        //check project permission
-        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
-        Status resultEnum = (Status) checkResult.get(Constants.STATUS);
-        if (resultEnum != Status.SUCCESS) {
-            return checkResult;
-        }
-
-        //check process instance exists
-        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
-        if (processInstance == null) {
-            putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
-            return result;
-        }
-
-        //check process instance status
-        if (!processInstance.getState().typeIsFinished()) {
-            putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
-                    processInstance.getName(), processInstance.getState().toString(), "update");
-            return result;
-        }
-        Date schedule = null;
-        schedule = processInstance.getScheduleTime();
-        if (scheduleTime != null) {
-            schedule = DateUtils.getScheduleDate(scheduleTime);
-        }
-        processInstance.setScheduleTime(schedule);
-        processInstance.setLocations(locations);
-        processInstance.setConnects(connects);
-        String globalParams = null;
-        String originDefParams = null;
-        int timeout = processInstance.getTimeout();
-        ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
-        if (StringUtils.isNotEmpty(processInstanceJson)) {
-            ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class);
-            //check workflow json is valid
-            Map<String, Object> checkFlowJson = processDefinitionService.checkProcessNodeList(processData, processInstanceJson);
-            if (checkFlowJson.get(Constants.STATUS) != Status.SUCCESS) {
-                return result;
-            }
-
-            originDefParams = JSONUtils.toJsonString(processData.getGlobalParams());
-            List<Property> globalParamList = processData.getGlobalParams();
-            Map<String, String> globalParamMap = Optional.ofNullable(globalParamList).orElse(Collections.emptyList()).stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
-            globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList,
-                    processInstance.getCmdTypeIfComplement(), schedule);
-            timeout = processData.getTimeout();
-            processInstance.setTimeout(timeout);
-            Tenant tenant = processService.getTenantForProcess(processData.getTenantId(),
-                    processDefinition.getUserId());
-            if (tenant != null) {
-                processInstance.setTenantCode(tenant.getTenantCode());
-            }
-            // get the processinstancejson before saving,and then save the name and taskid
-            String oldJson = processInstance.getProcessInstanceJson();
-            if (StringUtils.isNotEmpty(oldJson)) {
-                processInstanceJson = processService.changeJson(processData,oldJson);
-            }
-            processInstance.setProcessInstanceJson(processInstanceJson);
-            processInstance.setGlobalParams(globalParams);
-        }
-
-        int update = processService.updateProcessInstance(processInstance);
-        int updateDefine = 1;
-        if (Boolean.TRUE.equals(syncDefine)) {
-            processDefinition.setProcessDefinitionJson(processInstanceJson);
-            processDefinition.setGlobalParams(originDefParams);
-            processDefinition.setLocations(locations);
-            processDefinition.setConnects(connects);
-            processDefinition.setTimeout(timeout);
-            processDefinition.setUpdateTime(new Date());
-
-            // add process definition version
-            long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition);
-            processDefinition.setVersion(version);
-            updateDefine = processDefineMapper.updateById(processDefinition);
-        }
-        if (update > 0 && updateDefine > 0) {
-            putMsg(result, Status.SUCCESS);
-        } else {
-            putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR);
-        }
-
-        return result;
-
-    }
+    Map<String, Object> updateProcessInstance(User loginUser, String projectName, Integer processInstanceId,
+                                              String processInstanceJson, String scheduleTime, Boolean syncDefine,
+                                              Flag flag, String locations, String connects) throws ParseException;
 
     /**
      * query parent process instance detail info by sub process instance id
@@ -501,37 +118,7 @@ public class ProcessInstanceService extends BaseService {
      * @param subId sub process id
      * @return parent instance detail
      */
-    public Map<String, Object> queryParentInstanceBySubId(User loginUser, String projectName, Integer subId) {
-        Map<String, Object> result = new HashMap<>();
-        Project project = projectMapper.queryByName(projectName);
-
-        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
-        Status resultEnum = (Status) checkResult.get(Constants.STATUS);
-        if (resultEnum != Status.SUCCESS) {
-            return checkResult;
-        }
-
-        ProcessInstance subInstance = processService.findProcessInstanceDetailById(subId);
-        if (subInstance == null) {
-            putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, subId);
-            return result;
-        }
-        if (subInstance.getIsSubProcess() == Flag.NO) {
-            putMsg(result, Status.PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE, subInstance.getName());
-            return result;
-        }
-
-        ProcessInstance parentWorkflowInstance = processService.findParentProcessInstance(subId);
-        if (parentWorkflowInstance == null) {
-            putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST);
-            return result;
-        }
-        Map<String, Object> dataMap = new HashMap<>();
-        dataMap.put("parentWorkflowInstance", parentWorkflowInstance.getId());
-        result.put(DATA_LIST, dataMap);
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
+    Map<String, Object> queryParentInstanceBySubId(User loginUser, String projectName, Integer subId);
 
     /**
      * delete process instance by id, at the same time,delete task instance and their mapping relation data
@@ -541,38 +128,7 @@ public class ProcessInstanceService extends BaseService {
      * @param processInstanceId process instance id
      * @return delete result code
      */
-    @Transactional(rollbackFor = RuntimeException.class)
-    public Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId) {
-
-        Map<String, Object> result = new HashMap<>();
-        Project project = projectMapper.queryByName(projectName);
-
-        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
-        Status resultEnum = (Status) checkResult.get(Constants.STATUS);
-        if (resultEnum != Status.SUCCESS) {
-            return checkResult;
-        }
-        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
-        if (null == processInstance) {
-            putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
-            return result;
-        }
-
-        processService.removeTaskLogFile(processInstanceId);
-        // delete database cascade
-        int delete = processService.deleteWorkProcessInstanceById(processInstanceId);
-
-        processService.deleteAllSubWorkProcessByParentId(processInstanceId);
-        processService.deleteWorkProcessMapByParentId(processInstanceId);
-
-        if (delete > 0) {
-            putMsg(result, Status.SUCCESS);
-        } else {
-            putMsg(result, Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR);
-        }
-
-        return result;
-    }
+    Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId);
 
     /**
      * view process instance variables
@@ -580,71 +136,7 @@ public class ProcessInstanceService extends BaseService {
      * @param processInstanceId process instance id
      * @return variables data
      */
-    public Map<String, Object> viewVariables(Integer processInstanceId) {
-        Map<String, Object> result = new HashMap<>();
-
-        ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId);
-
-        if (processInstance == null) {
-            throw new RuntimeException("workflow instance is null");
-        }
-
-        Map<String, String> timeParams = BusinessTimeUtils
-                .getBusinessTime(processInstance.getCmdTypeIfComplement(),
-                        processInstance.getScheduleTime());
-
-        String workflowInstanceJson = processInstance.getProcessInstanceJson();
-
-        ProcessData workflowData = JSONUtils.parseObject(workflowInstanceJson, ProcessData.class);
-
-        String userDefinedParams = processInstance.getGlobalParams();
-
-        // global params
-        List<Property> globalParams = new ArrayList<>();
-
-        if (userDefinedParams != null && userDefinedParams.length() > 0) {
-            globalParams = JSONUtils.toList(userDefinedParams, Property.class);
-        }
-
-        List<TaskNode> taskNodeList = workflowData.getTasks();
-
-        // global param string
-        String globalParamStr = JSONUtils.toJsonString(globalParams);
-        globalParamStr = ParameterUtils.convertParameterPlaceholders(globalParamStr, timeParams);
-        globalParams = JSONUtils.toList(globalParamStr, Property.class);
-        for (Property property : globalParams) {
-            timeParams.put(property.getProp(), property.getValue());
-        }
-
-        // local params
-        Map<String, Map<String, Object>> localUserDefParams = new HashMap<>();
-        for (TaskNode taskNode : taskNodeList) {
-            String parameter = taskNode.getParams();
-            Map<String, String> map = JSONUtils.toMap(parameter);
-            String localParams = map.get(LOCAL_PARAMS);
-            if (localParams != null && !localParams.isEmpty()) {
-                localParams = ParameterUtils.convertParameterPlaceholders(localParams, timeParams);
-                List<Property> localParamsList = JSONUtils.toList(localParams, Property.class);
-
-                Map<String, Object> localParamsMap = new HashMap<>();
-                localParamsMap.put("taskType", taskNode.getType());
-                localParamsMap.put("localParamsList", localParamsList);
-                if (CollectionUtils.isNotEmpty(localParamsList)) {
-                    localUserDefParams.put(taskNode.getName(), localParamsMap);
-                }
-            }
-
-        }
-
-        Map<String, Object> resultMap = new HashMap<>();
-
-        resultMap.put(GLOBAL_PARAMS, globalParams);
-        resultMap.put(LOCAL_PARAMS, localUserDefParams);
-
-        result.put(DATA_LIST, resultMap);
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
+    Map<String, Object> viewVariables(Integer processInstanceId);
 
     /**
      * encapsulation gantt structure
@@ -653,67 +145,7 @@ public class ProcessInstanceService extends BaseService {
      * @return gantt tree data
      * @throws Exception exception when json parse
      */
-    public Map<String, Object> viewGantt(Integer processInstanceId) throws Exception {
-        Map<String, Object> result = new HashMap<>();
-
-        ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId);
-
-        if (processInstance == null) {
-            throw new RuntimeException("workflow instance is null");
-        }
-
-        GanttDto ganttDto = new GanttDto();
-
-        DAG<String, TaskNode, TaskNodeRelation> dag = processInstance2DAG(processInstance);
-        //topological sort
-        List<String> nodeList = dag.topologicalSort();
-
-        ganttDto.setTaskNames(nodeList);
-
-        List<Task> taskList = new ArrayList<>();
-        for (String node : nodeList) {
-            TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstanceId, node);
-            if (taskInstance == null) {
-                continue;
-            }
-            Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime();
-            Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime();
-            Task task = new Task();
-            task.setTaskName(taskInstance.getName());
-            task.getStartDate().add(startTime.getTime());
-            task.getEndDate().add(endTime.getTime());
-            task.setIsoStart(startTime);
-            task.setIsoEnd(endTime);
-            task.setStatus(taskInstance.getState().toString());
-            task.setExecutionDate(taskInstance.getStartTime());
-            task.setDuration(DateUtils.format2Readable(endTime.getTime() - startTime.getTime()));
-            taskList.add(task);
-        }
-        ganttDto.setTasks(taskList);
-
-        result.put(DATA_LIST, ganttDto);
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
-
-    /**
-     * process instance to DAG
-     *
-     * @param processInstance input process instance
-     * @return process instance dag.
-     */
-    private static DAG<String, TaskNode, TaskNodeRelation> processInstance2DAG(ProcessInstance processInstance) {
-
-        String processDefinitionJson = processInstance.getProcessInstanceJson();
-
-        ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
-
-        List<TaskNode> taskNodeList = processData.getTasks();
-
-        ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);
-
-        return DagHelper.buildDagGraph(processDag);
-    }
+    Map<String, Object> viewGantt(Integer processInstanceId) throws Exception;
 
     /**
      * query process instance by processDefinitionId and stateArray
@@ -721,9 +153,7 @@ public class ProcessInstanceService extends BaseService {
      * @param states states array
      * @return process instance list
      */
-    public List<ProcessInstance> queryByProcessDefineIdAndStatus(int processDefinitionId, int[] states) {
-        return processInstanceMapper.queryByProcessDefineIdAndStatus(processDefinitionId, states);
-    }
+    List<ProcessInstance> queryByProcessDefineIdAndStatus(int processDefinitionId, int[] states);
 
     /**
      * query process instance by processDefinitionId
@@ -731,8 +161,6 @@ public class ProcessInstanceService extends BaseService {
      * @param size size
      * @return process instance list
      */
-    public List<ProcessInstance> queryByProcessDefineId(int processDefinitionId,int size) {
-        return processInstanceMapper.queryByProcessDefineId(processDefinitionId, size);
-    }
+    List<ProcessInstance> queryByProcessDefineId(int processDefinitionId,int size);
 
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java
index 23de453..24f6189 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java
@@ -14,43 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.api.service;
 
-import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.dao.entity.Queue;
 import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import org.apache.commons.lang.StringUtils;
-import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
 
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 /**
  * queue service
  */
-@Service
-public class QueueService extends BaseService {
-
-    private static final Logger logger = LoggerFactory.getLogger(QueueService.class);
-
-    @Autowired
-    private QueueMapper queueMapper;
-
-    @Autowired
-    private UserMapper userMapper;
+public interface QueueService {
 
     /**
      * query queue list
@@ -58,18 +33,7 @@ public class QueueService extends BaseService {
      * @param loginUser login user
      * @return queue list
      */
-    public Map<String, Object> queryList(User loginUser) {
-        Map<String, Object> result = new HashMap<>();
-        if (isNotAdmin(loginUser, result)) {
-            return result;
-        }
-
-        List<Queue> queueList = queueMapper.selectList(null);
-        result.put(Constants.DATA_LIST, queueList);
-        putMsg(result, Status.SUCCESS);
-
-        return result;
-    }
+    Map<String, Object> queryList(User loginUser);
 
     /**
      * query queue list paging
@@ -80,26 +44,7 @@ public class QueueService extends BaseService {
      * @param pageSize page size
      * @return queue list
      */
-    public Map<String, Object> queryList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
-        Map<String, Object> result = new HashMap<>();
-        if (isNotAdmin(loginUser, result)) {
-            return result;
-        }
-
-        Page<Queue> page = new Page(pageNo, pageSize);
-
-
-        IPage<Queue> queueList = queueMapper.queryQueuePaging(page, searchVal);
-
-        Integer count = (int) queueList.getTotal();
-        PageInfo<Queue> pageInfo = new PageInfo<>(pageNo, pageSize);
-        pageInfo.setTotalCount(count);
-        pageInfo.setLists(queueList.getRecords());
-        result.put(Constants.DATA_LIST, pageInfo);
-        putMsg(result, Status.SUCCESS);
-
-        return result;
-    }
+    Map<String, Object> queryList(User loginUser, String searchVal, Integer pageNo, Integer pageSize);
 
     /**
      * create queue
@@ -109,45 +54,7 @@ public class QueueService extends BaseService {
      * @param queueName queue name
      * @return create result
      */
-    public Map<String, Object> createQueue(User loginUser, String queue, String queueName) {
-        Map<String, Object> result = new HashMap<>();
-        if (isNotAdmin(loginUser, result)) {
-            return result;
-        }
-
-        if (StringUtils.isEmpty(queue)) {
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queue");
-            return result;
-        }
-
-        if (StringUtils.isEmpty(queueName)) {
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queueName");
-            return result;
-        }
-
-        if (checkQueueNameExist(queueName)) {
-            putMsg(result, Status.QUEUE_NAME_EXIST, queueName);
-            return result;
-        }
-
-        if (checkQueueExist(queue)) {
-            putMsg(result, Status.QUEUE_VALUE_EXIST, queue);
-            return result;
-        }
-
-        Queue queueObj = new Queue();
-        Date now = new Date();
-
-        queueObj.setQueue(queue);
-        queueObj.setQueueName(queueName);
-        queueObj.setCreateTime(now);
-        queueObj.setUpdateTime(now);
-
-        queueMapper.insert(queueObj);
-        putMsg(result, Status.SUCCESS);
-
-        return result;
-    }
+    Map<String, Object> createQueue(User loginUser, String queue, String queueName);
 
     /**
      * update queue
@@ -158,66 +65,7 @@ public class QueueService extends BaseService {
      * @param queueName queue name
      * @return update result code
      */
-    public Map<String, Object> updateQueue(User loginUser, int id, String queue, String queueName) {
-        Map<String, Object> result = new HashMap<>();
-        if (isNotAdmin(loginUser, result)) {
-            return result;
-        }
-
-        if (StringUtils.isEmpty(queue)) {
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queue");
-            return result;
-        }
-
-        if (StringUtils.isEmpty(queueName)) {
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queueName");
-            return result;
-        }
-
-        Queue queueObj = queueMapper.selectById(id);
-        if (queueObj == null) {
-            putMsg(result, Status.QUEUE_NOT_EXIST, id);
-            return result;
-        }
-
-        // whether queue value or queueName is changed
-        if (queue.equals(queueObj.getQueue()) && queueName.equals(queueObj.getQueueName())) {
-            putMsg(result, Status.NEED_NOT_UPDATE_QUEUE);
-            return result;
-        }
-
-        // check queue name is exist
-        if (!queueName.equals(queueObj.getQueueName())
-                && checkQueueNameExist(queueName)) {
-            putMsg(result, Status.QUEUE_NAME_EXIST, queueName);
-            return result;
-        }
-
-        // check queue value is exist
-        if (!queue.equals(queueObj.getQueue()) && checkQueueExist(queue)) {
-            putMsg(result, Status.QUEUE_VALUE_EXIST, queue);
-            return result;
-        }
-
-        // check old queue using by any user
-        if (checkIfQueueIsInUsing(queueObj.getQueueName(), queueName)) {
-            //update user related old queue
-            Integer relatedUserNums = userMapper.updateUserQueue(queueObj.getQueueName(), queueName);
-            logger.info("old queue have related {} user, exec update user success.", relatedUserNums);
-        }
-
-        // update queue
-        Date now = new Date();
-        queueObj.setQueue(queue);
-        queueObj.setQueueName(queueName);
-        queueObj.setUpdateTime(now);
-
-        queueMapper.updateById(queueObj);
-
-        putMsg(result, Status.SUCCESS);
-
-        return result;
-    }
+    Map<String, Object> updateQueue(User loginUser, int id, String queue, String queueName);
 
     /**
      * verify queue and queueName
@@ -226,69 +74,6 @@ public class QueueService extends BaseService {
      * @param queueName queue name
      * @return true if the queue name not exists, otherwise return false
      */
-    public Result verifyQueue(String queue, String queueName) {
-        Result result = new Result();
-
-        if (StringUtils.isEmpty(queue)) {
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queue");
-            return result;
-        }
-
-        if (StringUtils.isEmpty(queueName)) {
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queueName");
-            return result;
-        }
-
-
-        if (checkQueueNameExist(queueName)) {
-            logger.error("queue name {} has exist, can't create again.", queueName);
-            putMsg(result, Status.QUEUE_NAME_EXIST, queueName);
-            return result;
-        }
-
-        if (checkQueueExist(queue)) {
-            logger.error("queue value {} has exist, can't create again.", queue);
-            putMsg(result, Status.QUEUE_VALUE_EXIST, queue);
-            return result;
-        }
-
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
-
-    /**
-     * check queue exist
-     * if exists return true,not exists return false
-     * check queue exist
-     *
-     * @param queue queue
-     * @return true if the queue not exists, otherwise return false
-     */
-    private boolean checkQueueExist(String queue) {
-        return CollectionUtils.isNotEmpty(queueMapper.queryAllQueueList(queue, null));
-    }
-
-    /**
-     * check queue name exist
-     * if exists return true,not exists return false
-     *
-     * @param queueName queue name
-     * @return true if the queue name not exists, otherwise return false
-     */
-    private boolean checkQueueNameExist(String queueName) {
-        return CollectionUtils.isNotEmpty(queueMapper.queryAllQueueList(null, queueName));
-    }
-
-    /**
-     * check old queue name using by any user
-     * if need to update user
-     *
-     * @param oldQueue old queue name
-     * @param newQueue new queue name
-     * @return true if need to update user
-     */
-    private boolean checkIfQueueIsInUsing (String oldQueue, String newQueue) {
-        return !oldQueue.equals(newQueue) && CollectionUtils.isNotEmpty(userMapper.queryUserListByQueue(oldQueue));
-    }
+    Result<Object> verifyQueue(String queue, String queueName);
 
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java
new file mode 100644
index 0000000..c2eb9ee
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.service.impl;
+
+import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.BaseService;
+import org.apache.dolphinscheduler.api.service.MonitorService;
+import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ZKNodeType;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.model.WorkerServerModel;
+import org.apache.dolphinscheduler.dao.MonitorDBDao;
+import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import com.google.common.collect.Sets;
+
+/**
+ * monitor service impl
+ */
+@Service
+public class MonitorServiceImpl extends BaseService implements MonitorService {
+
+    @Autowired
+    private ZookeeperMonitor zookeeperMonitor;
+
+    @Autowired
+    private MonitorDBDao monitorDBDao;
+
+    /**
+     * query database state
+     *
+     * @param loginUser login user
+     * @return data base state
+     */
+    public Map<String,Object> queryDatabaseState(User loginUser) {
+        Map<String, Object> result = new HashMap<>();
+
+        List<MonitorRecord> monitorRecordList = monitorDBDao.queryDatabaseState();
+
+        result.put(Constants.DATA_LIST, monitorRecordList);
+        putMsg(result, Status.SUCCESS);
+
+        return result;
+
+    }
+
+    /**
+     * query master list
+     *
+     * @param loginUser login user
+     * @return master information list
+     */
+    public Map<String,Object> queryMaster(User loginUser) {
+
+        Map<String, Object> result = new HashMap<>();
+
+        List<Server> masterServers = getServerListFromZK(true);
+        result.put(Constants.DATA_LIST, masterServers);
+        putMsg(result,Status.SUCCESS);
+
+        return result;
+    }
+
+    /**
+     * query zookeeper state
+     *
+     * @param loginUser login user
+     * @return zookeeper information list
+     */
+    public Map<String,Object> queryZookeeperState(User loginUser) {
+        Map<String, Object> result = new HashMap<>();
+
+        List<ZookeeperRecord> zookeeperRecordList = zookeeperMonitor.zookeeperInfoList();
+
+        result.put(Constants.DATA_LIST, zookeeperRecordList);
+        putMsg(result, Status.SUCCESS);
+
+        return result;
+
+    }
+
+    /**
+     * query worker list
+     *
+     * @param loginUser login user
+     * @return worker information list
+     */
+    public Map<String,Object> queryWorker(User loginUser) {
+
+        Map<String, Object> result = new HashMap<>();
+        List<WorkerServerModel> workerServers = getServerListFromZK(false)
+                .stream()
+                .map((Server server) -> {
+                    WorkerServerModel model = new WorkerServerModel();
+                    model.setId(server.getId());
+                    model.setHost(server.getHost());
+                    model.setPort(server.getPort());
+                    model.setZkDirectories(Sets.newHashSet(server.getZkDirectory()));
+                    model.setResInfo(server.getResInfo());
+                    model.setCreateTime(server.getCreateTime());
+                    model.setLastHeartbeatTime(server.getLastHeartbeatTime());
+                    return model;
+                })
+                .collect(Collectors.toList());
+
+        Map<String, WorkerServerModel> workerHostPortServerMapping = workerServers
+                .stream()
+                .collect(Collectors.toMap(
+                    (WorkerServerModel worker) -> {
+                        String[] s = worker.getZkDirectories().iterator().next().split("/");
+                        return s[s.length - 1];
+                    }
+                    , Function.identity()
+                    , (WorkerServerModel oldOne, WorkerServerModel newOne) -> {
+                        oldOne.getZkDirectories().addAll(newOne.getZkDirectories());
+                        return oldOne;
+                    }));
+
+        result.put(Constants.DATA_LIST, workerHostPortServerMapping.values());
+        putMsg(result,Status.SUCCESS);
+
+        return result;
+    }
+
+    public List<Server> getServerListFromZK(boolean isMaster) {
+
+        checkNotNull(zookeeperMonitor);
+        ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER;
+        return zookeeperMonitor.getServersList(zkNodeType);
+    }
+
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
similarity index 95%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 6458a76..926e3b0 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.api.service;
+package org.apache.dolphinscheduler.api.service.impl;
 
 import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
 import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
@@ -27,6 +27,14 @@ import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
 import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
 import org.apache.dolphinscheduler.api.dto.gantt.Task;
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.BaseService;
+import org.apache.dolphinscheduler.api.service.ExecutorService;
+import org.apache.dolphinscheduler.api.service.LoggerService;
+import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
+import org.apache.dolphinscheduler.api.service.ProcessDefinitionVersionService;
+import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.UsersService;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
@@ -75,8 +83,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
@@ -85,13 +91,10 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 
 /**
- * process instance service
+ * process instance service impl
  */
 @Service
-public class ProcessInstanceService extends BaseService {
-
-
-    private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceService.class);
+public class ProcessInstanceServiceImpl extends BaseService implements ProcessInstanceService {
 
     @Autowired
     ProjectMapper projectMapper;
@@ -155,7 +158,7 @@ public class ProcessInstanceService extends BaseService {
         }
         Date end = DateUtils.stringToDate(endTime);
         if (start == null || end == null) {
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startDate,endDate");
+            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE);
             return result;
         }
         if (start.getTime() > end.getTime()) {
@@ -241,12 +244,12 @@ public class ProcessInstanceService extends BaseService {
                 end = DateUtils.getScheduleDate(endDate);
             }
         } catch (Exception e) {
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startDate,endDate");
+            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE);
             return result;
         }
 
         Page<ProcessInstance> page = new Page<>(pageNo, pageSize);
-        PageInfo pageInfo = new PageInfo<ProcessInstance>(pageNo, pageSize);
+        PageInfo<ProcessInstance> pageInfo = new PageInfo<>(pageNo, pageSize);
         int executorId = usersService.getUserIdByName(executorName);
 
         IPage<ProcessInstance> processInstanceList =
@@ -307,7 +310,7 @@ public class ProcessInstanceService extends BaseService {
         for (TaskInstance taskInstance : taskInstanceList) {
             if (taskInstance.getTaskType().equalsIgnoreCase(TaskType.DEPENDENT.toString())) {
                 Result<String> logResult = loggerService.queryLog(
-                        taskInstance.getId(), 0, 4098);
+                        taskInstance.getId(), Constants.LOG_QUERY_SKIP_LINE_NUMBER, Constants.LOG_QUERY_LIMIT);
                 if (logResult.getCode() == Status.SUCCESS.ordinal()) {
                     String log = logResult.getData();
                     Map<String, DependResult> resultMap = parseLogForDependentResult(log);
@@ -380,7 +383,7 @@ public class ProcessInstanceService extends BaseService {
             return result;
         }
         Map<String, Object> dataMap = new HashMap<>();
-        dataMap.put("subProcessInstanceId", subWorkflowInstance.getId());
+        dataMap.put(Constants.SUBPROCESS_INSTANCE_ID, subWorkflowInstance.getId());
         result.put(DATA_LIST, dataMap);
         putMsg(result, Status.SUCCESS);
         return result;
@@ -527,7 +530,7 @@ public class ProcessInstanceService extends BaseService {
             return result;
         }
         Map<String, Object> dataMap = new HashMap<>();
-        dataMap.put("parentWorkflowInstance", parentWorkflowInstance.getId());
+        dataMap.put(Constants.PARENT_WORKFLOW_INSTANCE, parentWorkflowInstance.getId());
         result.put(DATA_LIST, dataMap);
         putMsg(result, Status.SUCCESS);
         return result;
@@ -627,8 +630,8 @@ public class ProcessInstanceService extends BaseService {
                 List<Property> localParamsList = JSONUtils.toList(localParams, Property.class);
 
                 Map<String, Object> localParamsMap = new HashMap<>();
-                localParamsMap.put("taskType", taskNode.getType());
-                localParamsMap.put("localParamsList", localParamsList);
+                localParamsMap.put(Constants.TASK_TYPE, taskNode.getType());
+                localParamsMap.put(Constants.LOCAL_PARAMS_LIST, localParamsList);
                 if (CollectionUtils.isNotEmpty(localParamsList)) {
                     localUserDefParams.put(taskNode.getName(), localParamsMap);
                 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
similarity index 92%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
index 23de453..5a2e943 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
@@ -14,37 +14,42 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.api.service;
+
+package org.apache.dolphinscheduler.api.service.impl;
 
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.BaseService;
+import org.apache.dolphinscheduler.api.service.QueueService;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.Queue;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import org.apache.commons.lang.StringUtils;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
 
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+
 /**
- * queue service
+ * queue service impl
  */
 @Service
-public class QueueService extends BaseService {
+public class QueueServiceImpl extends BaseService implements QueueService {
 
-    private static final Logger logger = LoggerFactory.getLogger(QueueService.class);
+    private static final Logger logger = LoggerFactory.getLogger(QueueServiceImpl.class);
 
     @Autowired
     private QueueMapper queueMapper;
@@ -86,8 +91,7 @@ public class QueueService extends BaseService {
             return result;
         }
 
-        Page<Queue> page = new Page(pageNo, pageSize);
-
+        Page<Queue> page = new Page<>(pageNo, pageSize);
 
         IPage<Queue> queueList = queueMapper.queryQueuePaging(page, searchVal);
 
@@ -116,12 +120,12 @@ public class QueueService extends BaseService {
         }
 
         if (StringUtils.isEmpty(queue)) {
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queue");
+            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE);
             return result;
         }
 
         if (StringUtils.isEmpty(queueName)) {
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queueName");
+            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE_NAME);
             return result;
         }
 
@@ -165,12 +169,12 @@ public class QueueService extends BaseService {
         }
 
         if (StringUtils.isEmpty(queue)) {
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queue");
+            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE);
             return result;
         }
 
         if (StringUtils.isEmpty(queueName)) {
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queueName");
+            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE_NAME);
             return result;
         }
 
@@ -226,28 +230,25 @@ public class QueueService extends BaseService {
      * @param queueName queue name
      * @return true if the queue name not exists, otherwise return false
      */
-    public Result verifyQueue(String queue, String queueName) {
-        Result result = new Result();
+    public Result<Object> verifyQueue(String queue, String queueName) {
+        Result<Object> result = new Result<>();
 
         if (StringUtils.isEmpty(queue)) {
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queue");
+            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE);
             return result;
         }
 
         if (StringUtils.isEmpty(queueName)) {
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queueName");
+            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE_NAME);
             return result;
         }
 
-
         if (checkQueueNameExist(queueName)) {
-            logger.error("queue name {} has exist, can't create again.", queueName);
             putMsg(result, Status.QUEUE_NAME_EXIST, queueName);
             return result;
         }
 
         if (checkQueueExist(queue)) {
-            logger.error("queue value {} has exist, can't create again.", queue);
             putMsg(result, Status.QUEUE_VALUE_EXIST, queue);
             return result;
         }
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java
index b155d59..02cbd79 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java
@@ -17,6 +17,7 @@
 package org.apache.dolphinscheduler.api.service;
 
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.impl.MonitorServiceImpl;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.DbType;
 import org.apache.dolphinscheduler.common.model.Server;
@@ -43,7 +44,8 @@ public class MonitorServiceTest {
     private static final Logger logger = LoggerFactory.getLogger(MonitorServiceTest.class);
 
     @InjectMocks
-    private MonitorService monitorService;
+    private MonitorServiceImpl monitorService;
+
     @Mock
     private MonitorDBDao monitorDBDao;
 
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
index de23d75..e39f979 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.when;
 
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl;
+import org.apache.dolphinscheduler.api.service.impl.ProcessInstanceServiceImpl;
 import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
@@ -68,7 +69,7 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 public class ProcessInstanceServiceTest {
 
     @InjectMocks
-    ProcessInstanceService processInstanceService;
+    ProcessInstanceServiceImpl processInstanceService;
 
     @Mock
     ProjectMapper projectMapper;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
index dbae95b..5bb44a2 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.impl.QueueServiceImpl;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
@@ -50,7 +51,7 @@ public class QueueServiceTest {
     private static final Logger logger = LoggerFactory.getLogger(QueueServiceTest.class);
 
     @InjectMocks
-    private QueueService queueService;
+    private QueueServiceImpl queueService;
     @Mock
     private QueueMapper queueMapper;
     @Mock
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index d2855e0..0297ae6 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -752,9 +752,17 @@ public final class Constants {
     public static final String SUBTRACT_STRING = "-";
     public static final String GLOBAL_PARAMS = "globalParams";
     public static final String LOCAL_PARAMS = "localParams";
+    public static final String LOCAL_PARAMS_LIST = "localParamsList";
+    public static final String SUBPROCESS_INSTANCE_ID = "subProcessInstanceId";
     public static final String PROCESS_INSTANCE_STATE = "processInstanceState";
+    public static final String PARENT_WORKFLOW_INSTANCE = "parentWorkflowInstance";
+    public static final String TASK_TYPE = "taskType";
     public static final String TASK_LIST = "taskList";
     public static final String RWXR_XR_X = "rwxr-xr-x";
+    public static final String QUEUE = "queue";
+    public static final String QUEUE_NAME = "queueName";
+    public static final int LOG_QUERY_SKIP_LINE_NUMBER = 0;
+    public static final int LOG_QUERY_LIMIT = 4096;
 
     /**
      * master/worker server use for zk
@@ -1010,11 +1018,13 @@ public final class Constants {
      */
     public static final String PLUGIN_JAR_SUFFIX = ".jar";
 
-    public static final int NORAML_NODE_STATUS = 0;
+    public static final int NORMAL_NODE_STATUS = 0;
     public static final int ABNORMAL_NODE_STATUS = 1;
 
     public static final String START_TIME = "start time";
     public static final String END_TIME = "end time";
+    public static final String START_END_DATE = "startDate,endDate";
+
     /**
      * system line separator
      */
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index bd8c79c..b89d851 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -19,12 +19,13 @@ package org.apache.dolphinscheduler.server.registry;
 
 import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
 
-import java.util.Date;
-import java.util.Set;
-
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
+
+import java.util.Date;
+import java.util.Set;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,7 +57,7 @@ public class HeartBeatTask extends Thread {
             double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
             double loadAverage = OSUtils.loadAverage();
 
-            int status = Constants.NORAML_NODE_STATUS;
+            int status = Constants.NORMAL_NODE_STATUS;
 
             if (availablePhysicalMemorySize < reservedMemory
                     || loadAverage > maxCpuloadAvg) {