You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/14 08:18:16 UTC

[dolphinscheduler] branch dev updated: Make LogServiceClient Singleton (#11777)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new d3a77c68e6 Make LogServiceClient Singleton (#11777)
d3a77c68e6 is described below

commit d3a77c68e62b0c471e25cd6a4bfbd2c561dd2023
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Sep 14 16:18:08 2022 +0800

    Make LogServiceClient Singleton (#11777)
---
 .../api/exceptions/ServiceException.java           |  36 ++---
 .../api/service/impl/ExecutorServiceImpl.java      |  10 +-
 .../api/service/impl/LoggerServiceImpl.java        |  19 +--
 .../service/impl/ProcessInstanceServiceImpl.java   | 105 ++++++--------
 .../api/service/impl/SchedulerServiceImpl.java     |  10 +-
 .../api/exceptions/ServiceExceptionTest.java       |  14 +-
 .../api/service/ExecutorServiceTest.java           |   3 +-
 .../api/service/LoggerServiceTest.java             |  49 +++----
 .../api/service/ProcessDefinitionServiceTest.java  |   2 +-
 .../api/service/ProcessInstanceServiceTest.java    |  38 +++--
 .../api/service/TaskInstanceServiceTest.java       |   3 +-
 .../server/log/LoggerRequestProcessor.java         |   6 +-
 .../master/service/MasterFailoverService.java      |  12 +-
 .../master/service/WorkerFailoverService.java      |  31 ++---
 .../master/utils/DataQualityResultOperator.java    |   3 +-
 .../master/registry/MasterRegistryClientTest.java  |  17 +--
 .../server/master/service/FailoverServiceTest.java |  18 ++-
 .../remote/NettyRemotingClient.java                | 104 ++++----------
 .../remote/config/NettyClientConfig.java           |  64 ++-------
 .../server/utils/ProcessUtils.java                 |  13 +-
 .../log/{LogClientService.java => LogClient.java}  | 155 ++++++++++-----------
 .../dolphinscheduler/service/log/LogPromise.java   | 108 --------------
 .../service/process/ProcessService.java            |   3 +-
 .../service/process/ProcessServiceImpl.java        | 102 +++++++-------
 ...ogClientServiceTest.java => LogClientTest.java} |  33 ++---
 .../plugin/task/api/AbstractRemoteTask.java        |   4 +-
 .../plugin/task/api/AbstractYarnTask.java          |  20 +--
 .../plugin/task/api/utils/LogUtils.java            |  12 +-
 .../plugin/task/api/utils/LogUtilsTest.java        |   8 +-
 .../plugin/task/dinky/DinkyTask.java               |  39 +++---
 .../plugin/task/emr/EmrAddStepsTask.java           |  50 +++----
 .../plugin/task/emr/EmrJobFlowTask.java            |   6 +-
 .../plugin/task/flink/FlinkStreamTask.java         |   8 +-
 .../plugin/task/flink/FlinkTask.java               |  47 +------
 .../plugin/task/hivecli/HiveCliTask.java           |   6 +-
 .../plugin/task/jupyter/JupyterTask.java           |  10 +-
 .../dolphinscheduler/plugin/task/k8s/K8sTask.java  |   8 +-
 .../plugin/task/pigeon/PigeonTask.java             |  45 +++---
 .../plugin/task/sagemaker/SagemakerTask.java       |   6 +-
 .../plugin/task/seatunnel/SeatunnelTask.java       |  22 +--
 .../plugin/task/zeppelin/ZeppelinTask.java         |  14 +-
 .../server/worker/processor/TaskKillProcessor.java |  49 ++++---
 .../worker/runner/WorkerTaskExecuteRunnable.java   |   8 +-
 43 files changed, 518 insertions(+), 802 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ServiceException.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ServiceException.java
index 369fbbec3e..2fa3e01a1c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ServiceException.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ServiceException.java
@@ -20,48 +20,36 @@ import org.apache.dolphinscheduler.api.enums.Status;
 
 import java.text.MessageFormat;
 
+import lombok.Data;
 
-/**
- * service exception
- */
+@Data
 public class ServiceException extends RuntimeException {
 
-    /**
-     * code
-     */
-    private Integer code;
+    private int code;
 
     public ServiceException() {
+        this(Status.INTERNAL_SERVER_ERROR_ARGS);
     }
 
     public ServiceException(Status status) {
-        super(status.getMsg());
-        this.code = status.getCode();
+        this(status.getCode(), status.getMsg());
     }
 
     public ServiceException(Status status, Object... formatter) {
-        super(MessageFormat.format(status.getMsg(), formatter));
-        this.code = status.getCode();
-    }
-
-    public ServiceException(Integer code,String message) {
-        super(message);
-        this.code = code;
+        this(status.getCode(), MessageFormat.format(status.getMsg(), formatter));
     }
 
     public ServiceException(String message) {
-        super(message);
+        this(Status.INTERNAL_SERVER_ERROR_ARGS, message);
     }
 
-    public ServiceException(String message, Exception cause) {
-        super(message, cause);
+    public ServiceException(int code, String message) {
+        this(code, message, null);
     }
 
-    public Integer getCode() {
-        return this.code;
-    }
-
-    public void setCode(Integer code) {
+    public ServiceException(int code, String message, Exception cause) {
+        super(message, cause);
         this.code = code;
     }
+
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index b4fb3fb8aa..20b2d9604c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -31,6 +31,7 @@ import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_TIME_MAX_LEN
 import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.service.ExecutorService;
 import org.apache.dolphinscheduler.api.service.MonitorService;
 import org.apache.dolphinscheduler.api.service.ProjectService;
@@ -366,11 +367,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
             return result;
         }
 
-        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
-        if (processInstance == null) {
-            putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
-            return result;
-        }
+        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId)
+                .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
 
         ProcessDefinition processDefinition =
                 processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
@@ -1022,7 +1020,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
      */
     @Override
     public WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId) {
-        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
+        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId).orElse(null);
         if (processInstance == null) {
             return null;
         }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
index 39f9208b5f..518a03004a 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
@@ -31,7 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.service.log.LogClientService;
+import org.apache.dolphinscheduler.service.log.LogClient;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import org.apache.commons.lang3.StringUtils;
@@ -66,7 +66,8 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
     @Autowired
     private ProcessService processService;
 
-    private LogClientService logClient;
+    @Autowired
+    private LogClient logClient;
 
     @Autowired
     ProjectMapper projectMapper;
@@ -77,20 +78,6 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
     @Autowired
     TaskDefinitionMapper taskDefinitionMapper;
 
-    @PostConstruct
-    public void init() {
-        if (Objects.isNull(this.logClient)) {
-            this.logClient = new LogClientService();
-        }
-    }
-
-    @PreDestroy
-    public void close() {
-        if (Objects.nonNull(this.logClient) && this.logClient.isRunning()) {
-            logClient.close();
-        }
-    }
-
     /**
      * view log
      *
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index d6c6cc329f..cf98c5a3b4 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -17,17 +17,10 @@
 
 package org.apache.dolphinscheduler.api.service.impl;
 
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE;
-import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
-import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
-import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
-import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
-
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
 import org.apache.dolphinscheduler.api.dto.gantt.Task;
 import org.apache.dolphinscheduler.api.enums.Status;
@@ -76,9 +69,9 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
 
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
@@ -95,12 +88,18 @@ import java.util.Objects;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE;
+import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST;
+import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR;
+import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
+import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
+import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
+import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
 
 /**
  * process instance service impl
@@ -226,7 +225,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
-        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
+        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId)
+                .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processId));
 
         ProcessDefinition processDefinition =
                 processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
@@ -339,17 +339,16 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
     public Map<String, Object> queryTaskListByProcessId(User loginUser, long projectCode,
                                                         Integer processId) throws IOException {
         Project project = projectMapper.queryByCode(projectCode);
-        // check user access for project
-        Map<String, Object> result =
-                projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE);
+        //check user access for project
+        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
-        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
-        ProcessDefinition processDefinition =
-                processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
+        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId)
+                .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processId));
+        ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
         if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
-            putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processId);
+            putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processId);
             return result;
         }
         List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processId);
@@ -481,28 +480,23 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
                                                      String globalParams,
                                                      String locations, int timeout, String tenantCode) {
         Project project = projectMapper.queryByCode(projectCode);
-        // check user access for project
-        Map<String, Object> result =
-                projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_UPDATE);
+        //check user access for project
+        Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_UPDATE);
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
-        // check process instance exists
-        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
-        if (processInstance == null) {
-            putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
-            return result;
-        }
-        // check process instance exists in project
-        ProcessDefinition processDefinition0 =
-                processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
+        //check process instance exists
+        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId)
+                .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
+        //check process instance exists in project
+        ProcessDefinition processDefinition0 = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
         if (processDefinition0 != null && projectCode != processDefinition0.getProjectCode()) {
-            putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
+            putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
             return result;
         }
-        // check process instance status
+        //check process instance status
         if (!processInstance.getState().isFinished()) {
-            putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
+            putMsg(result, PROCESS_INSTANCE_STATE_OPERATION_ERROR,
                     processInstance.getName(), processInstance.getState().toString(), "update");
             return result;
         }
@@ -620,11 +614,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
             return result;
         }
 
-        ProcessInstance subInstance = processService.findProcessInstanceDetailById(subId);
-        if (subInstance == null) {
-            putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, subId);
-            return result;
-        }
+        ProcessInstance subInstance = processService.findProcessInstanceDetailById(subId)
+                .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, subId));
         if (subInstance.getIsSubProcess() == Flag.NO) {
             putMsg(result, Status.PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE, subInstance.getName());
             return result;
@@ -660,23 +651,17 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
         if (result.get(Constants.STATUS) != Status.SUCCESS) {
             return result;
         }
-        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
-        if (null == processInstance) {
-            putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, String.valueOf(processInstanceId));
-            return result;
-        }
+        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId)
+                .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
         // check process instance status
         if (!processInstance.getState().isFinished()) {
-            putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
-                    processInstance.getName(), processInstance.getState().toString(), "delete");
-            return result;
+            throw new ServiceException(PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), processInstance.getState(), "delete");
         }
 
         ProcessDefinition processDefinition =
                 processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
         if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
-            putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, String.valueOf(processInstanceId));
-            return result;
+            throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
         }
 
         try {
@@ -722,7 +707,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
         ProcessDefinition processDefinition =
                 processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
         if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
-            putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
+            putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
             return result;
         }
 
@@ -811,7 +796,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
                 processInstance.getProcessDefinitionCode(),
                 processInstance.getProcessDefinitionVersion());
         if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
-            putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
+            putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
             return result;
         }
         GanttDto ganttDto = new GanttDto();
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
index 392543e788..677d580e2d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
@@ -78,9 +78,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.cronutils.model.Cron;
 
-/**
- * scheduler service impl
- */
 @Service
 public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerService {
 
@@ -382,9 +379,10 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
                     return result;
             }
         } catch (Exception e) {
-            result.put(Constants.MSG,
-                    scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");
-            throw new ServiceException(result.get(Constants.MSG).toString(), e);
+            Status status = scheduleStatus == ReleaseState.ONLINE ? Status.PUBLISH_SCHEDULE_ONLINE_ERROR
+                    : Status.OFFLINE_SCHEDULE_ERROR;
+            result.put(Constants.STATUS, status);
+            throw new ServiceException(status, e);
         }
 
         putMsg(result, Status.SUCCESS);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ServiceExceptionTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ServiceExceptionTest.java
index a574253d1d..3bb6479ba9 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ServiceExceptionTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ServiceExceptionTest.java
@@ -17,25 +17,27 @@
 package org.apache.dolphinscheduler.api.exceptions;
 
 import org.apache.dolphinscheduler.api.enums.Status;
+
 import org.junit.Assert;
 import org.junit.Test;
 
 public class ServiceExceptionTest {
+
     @Test
-    public void getCodeTest(){
+    public void getCodeTest() {
         ServiceException serviceException = new ServiceException();
-        Assert.assertNull(serviceException.getCode());
+        Assert.assertEquals(Status.INTERNAL_SERVER_ERROR_ARGS.getCode(), serviceException.getCode());
 
         serviceException = new ServiceException(Status.ALERT_GROUP_EXIST);
-        Assert.assertNotNull(serviceException.getCode());
+        Assert.assertEquals(Status.ALERT_GROUP_EXIST.getCode(), serviceException.getCode());
 
         serviceException = new ServiceException(10012, "alarm group already exists");
-        Assert.assertNotNull(serviceException.getCode());
+        Assert.assertEquals(10012, serviceException.getCode());
     }
     @Test
-    public void getMessageTest(){
+    public void getMessageTest() {
         ServiceException serviceException = new ServiceException();
-        Assert.assertNull(serviceException.getMessage());
+        Assert.assertEquals(Status.INTERNAL_SERVER_ERROR_ARGS.getMsg(), serviceException.getMessage());
 
         serviceException = new ServiceException(Status.ALERT_GROUP_EXIST);
         Assert.assertNotNull(serviceException.getMessage());
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 9affbd2c2f..2a2409e06a 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -54,6 +54,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import org.junit.Assert;
 import org.junit.Before;
@@ -179,7 +180,7 @@ public class ExecutorServiceTest {
         Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
         Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1);
         Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList());
-        Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance);
+        Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(Optional.ofNullable(processInstance));
         Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition);
         Mockito.when(taskGroupQueueMapper.selectById(1)).thenReturn(taskGroupQueue);
         Mockito.when(processInstanceMapper.selectById(1)).thenReturn(processInstance);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
index 4be8e246fc..1391ee0857 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.dolphinscheduler.api.service;
 
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DOWNLOAD_LOG;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VIEW_LOG;
+
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl;
 import org.apache.dolphinscheduler.api.utils.Result;
@@ -28,15 +31,14 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.service.log.LogClient;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.text.MessageFormat;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
@@ -47,9 +49,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DOWNLOAD_LOG;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VIEW_LOG;
-
 /**
  * logger service test
  */
@@ -74,10 +73,8 @@ public class LoggerServiceTest {
     @Mock
     private TaskDefinitionMapper taskDefinitionMapper;
 
-    @Before
-    public void init() {
-        this.loggerService.init();
-    }
+    @Mock
+    private LogClient logClient;
 
     @Test
     public void testQueryDataSourceList() {
@@ -85,11 +82,11 @@ public class LoggerServiceTest {
         TaskInstance taskInstance = new TaskInstance();
         Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
         Result result = loggerService.queryLog(2, 1, 1);
-        //TASK_INSTANCE_NOT_FOUND
+        // TASK_INSTANCE_NOT_FOUND
         Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), result.getCode().intValue());
 
         try {
-            //HOST NOT FOUND OR ILLEGAL
+            // HOST NOT FOUND OR ILLEGAL
             result = loggerService.queryLog(1, 1, 1);
         } catch (RuntimeException e) {
             Assert.assertTrue(true);
@@ -97,7 +94,7 @@ public class LoggerServiceTest {
         }
         Assert.assertEquals(Status.TASK_INSTANCE_HOST_IS_NULL.getCode(), result.getCode().intValue());
 
-        //SUCCESS
+        // SUCCESS
         taskInstance.setHost("127.0.0.1:8080");
         taskInstance.setLogPath("/temp/log");
         Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
@@ -111,7 +108,7 @@ public class LoggerServiceTest {
         TaskInstance taskInstance = new TaskInstance();
         Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
 
-        //task instance is null
+        // task instance is null
         try {
             loggerService.getLogBytes(2);
         } catch (RuntimeException e) {
@@ -119,7 +116,7 @@ public class LoggerServiceTest {
             logger.error("testGetLogBytes error: {}", "task instance is null");
         }
 
-        //task instance host is null
+        // task instance host is null
         try {
             loggerService.getLogBytes(1);
         } catch (RuntimeException e) {
@@ -127,11 +124,13 @@ public class LoggerServiceTest {
             logger.error("testGetLogBytes error: {}", "task instance host is null");
         }
 
-        //success
+        // success
         taskInstance.setHost("127.0.0.1:8080");
         taskInstance.setLogPath("/temp/log");
-        //if use @RunWith(PowerMockRunner.class) mock object,sonarcloud will not calculate the coverage,
+        // if use @RunWith(PowerMockRunner.class) mock object,sonarcloud will not calculate the coverage,
         // so no assert will be added here
+        Mockito.when(logClient.getLogBytes(Mockito.anyString(), Mockito.anyInt(), Mockito.anyString()))
+                .thenReturn(new byte[0]);
         loggerService.getLogBytes(1);
 
     }
@@ -152,12 +151,12 @@ public class LoggerServiceTest {
         TaskDefinition taskDefinition = new TaskDefinition();
         taskDefinition.setProjectCode(projectCode);
         taskDefinition.setCode(1L);
-        //SUCCESS
+        // SUCCESS
         taskInstance.setTaskCode(1L);
         taskInstance.setId(1);
         taskInstance.setHost("127.0.0.1:8080");
         taskInstance.setLogPath("/temp/log");
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,VIEW_LOG)).thenReturn(result);
+        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, VIEW_LOG)).thenReturn(result);
         Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
         Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
         result = loggerService.queryLog(loginUser, projectCode, 1, 1, 1);
@@ -179,22 +178,20 @@ public class LoggerServiceTest {
         TaskDefinition taskDefinition = new TaskDefinition();
         taskDefinition.setProjectCode(projectCode);
         taskDefinition.setCode(1L);
-        //SUCCESS
+        // SUCCESS
         taskInstance.setTaskCode(1L);
         taskInstance.setId(1);
         taskInstance.setHost("127.0.0.1:8080");
         taskInstance.setLogPath("/temp/log");
-        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,DOWNLOAD_LOG )).thenReturn(result);
+        Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, DOWNLOAD_LOG))
+                .thenReturn(result);
         Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
         Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
+        Mockito.when(logClient.getLogBytes(Mockito.anyString(), Mockito.anyInt(), Mockito.anyString()))
+                .thenReturn(new byte[0]);
         loggerService.getLogBytes(loginUser, projectCode, 1);
     }
 
-    @After
-    public void close() {
-        this.loggerService.close();
-    }
-
     /**
      * get mock Project
      *
@@ -218,4 +215,4 @@ public class LoggerServiceTest {
             result.put(Constants.MSG, status.getMsg());
         }
     }
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index bfd8ecfa34..05c63ab537 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -203,7 +203,7 @@ public class ProcessDefinitionServiceTest {
                     .checkProjectAndAuthThrowException(loginUser, null, WORKFLOW_DEFINITION);
             processDefinitionService.queryProcessDefinitionListPaging(loginUser, projectCode, "", "", 1, 5, 0);
         } catch (ServiceException serviceException) {
-            Assert.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), serviceException.getCode().intValue());
+            Assert.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), serviceException.getCode());
         }
 
         Map<String, Object> result = new HashMap<>();
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
index 4167673faa..48dd44e468 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
@@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.when;
 
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl;
 import org.apache.dolphinscheduler.api.service.impl.ProcessInstanceServiceImpl;
 import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
@@ -70,6 +71,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -308,7 +310,8 @@ public class ProcessInstanceServiceTest {
         processDefinition.setProjectCode(projectCode);
         when(projectMapper.queryByCode(projectCode)).thenReturn(project);
         when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
-        when(processService.findProcessInstanceDetailById(processInstance.getId())).thenReturn(processInstance);
+        when(processService.findProcessInstanceDetailById(processInstance.getId()))
+                .thenReturn(Optional.of(processInstance));
         when(processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
                 processInstance.getProcessDefinitionVersion())).thenReturn(processDefinition);
         Map<String, Object> successRes = processInstanceService.queryProcessInstanceById(loginUser, projectCode, 1);
@@ -353,7 +356,8 @@ public class ProcessInstanceServiceTest {
         res.setData("xxx");
         when(projectMapper.queryByCode(projectCode)).thenReturn(project);
         when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
-        when(processService.findProcessInstanceDetailById(processInstance.getId())).thenReturn(processInstance);
+        when(processService.findProcessInstanceDetailById(processInstance.getId()))
+                .thenReturn(Optional.of(processInstance));
         when(processService.findValidTaskListByProcessId(processInstance.getId())).thenReturn(taskInstanceList);
         when(loggerService.queryLog(taskInstance.getId(), 0, 4098)).thenReturn(res);
         Map<String, Object> successRes = processInstanceService.queryTaskListByProcessId(loginUser, projectCode, 1);
@@ -451,14 +455,18 @@ public class ProcessInstanceServiceTest {
         ProcessInstance processInstance = getProcessInstance();
         when(projectMapper.queryByCode(projectCode)).thenReturn(project);
         when(projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_UPDATE)).thenReturn(result);
-        when(processService.findProcessInstanceDetailById(1)).thenReturn(null);
-        Map<String, Object> processInstanceNullRes =
-                processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
-                        shellJson, taskJson, "2020-02-21 00:00:00", true, "", "", 0, "");
-        Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceNullRes.get(Constants.STATUS));
+        when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.empty());
+        try {
+            Map<String, Object> processInstanceNullRes =
+                    processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
+                            shellJson, taskJson, "2020-02-21 00:00:00", true, "", "", 0, "");
+            Assert.fail();
+        } catch (ServiceException ex) {
+            Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST.getCode(), ex.getCode());
+        }
 
         // process instance not finish
-        when(processService.findProcessInstanceDetailById(1)).thenReturn(processInstance);
+        when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.ofNullable(processInstance));
         processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
         putMsg(result, Status.SUCCESS, projectCode);
         Map<String, Object> processInstanceNotFinishRes =
@@ -523,16 +531,20 @@ public class ProcessInstanceServiceTest {
         putMsg(result, Status.SUCCESS, projectCode);
         when(projectMapper.queryByCode(projectCode)).thenReturn(project);
         when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
-        when(processService.findProcessInstanceDetailById(1)).thenReturn(null);
-        Map<String, Object> processInstanceNullRes =
-                processInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1);
-        Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceNullRes.get(Constants.STATUS));
+        when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.empty());
+        try {
+            Map<String, Object> processInstanceNullRes =
+                    processInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1);
+
+        } catch (ServiceException ex) {
+            Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST.getCode(), ex.getCode());
+        }
 
         // not sub process
         ProcessInstance processInstance = getProcessInstance();
         processInstance.setIsSubProcess(Flag.NO);
         putMsg(result, Status.SUCCESS, projectCode);
-        when(processService.findProcessInstanceDetailById(1)).thenReturn(processInstance);
+        when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.ofNullable(processInstance));
         Map<String, Object> notSubProcessRes =
                 processInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1);
         Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE, notSubProcessRes.get(Constants.STATUS));
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
index e1e783c2ac..dd06ce66ef 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
@@ -51,6 +51,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -158,7 +159,7 @@ public class TaskInstanceServiceTest {
             eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), eq(start), eq(end))).thenReturn(pageReturn);
         when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser);
         when(processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()))
-                .thenReturn(processInstance);
+                .thenReturn(Optional.of(processInstance));
 
         Result successRes = taskInstanceService.queryTaskListPaging(loginUser, projectCode, 1, "", "", "",
             "test_user", "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", TaskExecutionStatus.SUCCESS, "192.168.xx.xx", TaskExecuteType.BATCH, 1, 20);
diff --git a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
index d921d54000..0d04ed8596 100644
--- a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
+++ b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
@@ -46,10 +46,8 @@ import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
@@ -168,9 +166,9 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
                 if (!checkPathSecurity(logPath)) {
                     throw new IllegalArgumentException("Illegal path");
                 }
-                Set<String> appIds = LogUtils.getAppIdsFromLogFile(logPath);
+                List<String> appIds = LogUtils.getAppIdsFromLogFile(logPath);
                 channel.writeAndFlush(
-                        new GetAppIdResponseCommand(new ArrayList<>(appIds)).convert2Command(command.getOpaque()));
+                        new GetAppIdResponseCommand(appIds).convert2Command(command.getOpaque()));
                 break;
             default:
                 throw new IllegalArgumentException("unknown commandType: " + commandType);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
index 9e1db701fa..bdb7289aad 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -38,6 +37,7 @@ import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
 import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.service.log.LogClient;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
@@ -73,17 +73,21 @@ public class MasterFailoverService {
 
     private final ProcessInstanceExecCacheManager processInstanceExecCacheManager;
 
+    private final LogClient logClient;
+
     public MasterFailoverService(@NonNull RegistryClient registryClient,
                                  @NonNull MasterConfig masterConfig,
                                  @NonNull ProcessService processService,
                                  @NonNull NettyExecutorManager nettyExecutorManager,
-                                 @NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager) {
+                                 @NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager,
+                                 @NonNull LogClient logClient) {
         this.registryClient = registryClient;
         this.masterConfig = masterConfig;
         this.processService = processService;
-        this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
         this.nettyExecutorManager = nettyExecutorManager;
+        this.localAddress = masterConfig.getMasterAddress();
         this.processInstanceExecCacheManager = processInstanceExecCacheManager;
+        this.logClient = logClient;
 
     }
 
@@ -233,7 +237,7 @@ public class MasterFailoverService {
             if (masterConfig.isKillYarnJobWhenTaskFailover()) {
                 // only kill yarn job if exists , the local thread has exited
                 LOGGER.info("TaskInstance failover begin kill the task related yarn job");
-                ProcessUtils.killYarnJob(taskExecutionContext);
+                ProcessUtils.killYarnJob(logClient, taskExecutionContext);
             }
             // kill worker task, When the master failover and worker failover happened in the same time,
             // the task may not be failover if we don't set NEED_FAULT_TOLERANCE.
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
index 99c62f1172..a294874ff4 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
@@ -17,13 +17,16 @@
 
 package org.apache.dolphinscheduler.server.master.service;
 
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.StopWatch;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -37,13 +40,14 @@ import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.service.log.LogClient;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
 
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.StopWatch;
-
+import javax.annotation.Nullable;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -52,14 +56,6 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import javax.annotation.Nullable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-
-import lombok.NonNull;
-
 @Service
 public class WorkerFailoverService {
 
@@ -70,19 +66,22 @@ public class WorkerFailoverService {
     private final ProcessService processService;
     private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
     private final ProcessInstanceExecCacheManager cacheManager;
+    private final LogClient logClient;
     private final String localAddress;
 
     public WorkerFailoverService(@NonNull RegistryClient registryClient,
                                  @NonNull MasterConfig masterConfig,
                                  @NonNull ProcessService processService,
                                  @NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool,
-                                 @NonNull ProcessInstanceExecCacheManager cacheManager) {
+                                 @NonNull ProcessInstanceExecCacheManager cacheManager,
+                                 @NonNull LogClient logClient) {
         this.registryClient = registryClient;
         this.masterConfig = masterConfig;
         this.processService = processService;
         this.workflowExecuteThreadPool = workflowExecuteThreadPool;
         this.cacheManager = cacheManager;
-        this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
+        this.logClient = logClient;
+        this.localAddress = masterConfig.getMasterAddress();
     }
 
     /**
@@ -172,7 +171,7 @@ public class WorkerFailoverService {
             if (masterConfig.isKillYarnJobWhenTaskFailover()) {
                 // only kill yarn job if exists , the local thread has exited
                 LOGGER.info("TaskInstance failover begin kill the task related yarn job");
-                ProcessUtils.killYarnJob(taskExecutionContext);
+                ProcessUtils.killYarnJob(logClient, taskExecutionContext);
             }
         } else {
             LOGGER.info("The failover taskInstance is a master task");
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DataQualityResultOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DataQualityResultOperator.java
index 42d9d14e49..c196def48a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DataQualityResultOperator.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DataQualityResultOperator.java
@@ -63,8 +63,7 @@ public class DataQualityResultOperator {
         if (TASK_TYPE_DATA_QUALITY.equals(taskInstance.getTaskType())) {
 
             ProcessInstance processInstance =
-                    processService.findProcessInstanceDetailById(
-                            Integer.parseInt(String.valueOf(taskInstance.getProcessInstanceId())));
+                    processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()).orElse(null);
 
             // when the task is failure or cancel, will delete the execute result and statistics value
             if (taskResponseEvent.getState().isFailure()
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
index b71e267216..23f209bbdf 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
@@ -25,8 +25,6 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.registry.api.ConnectionState;
-import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -34,7 +32,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import java.util.Arrays;
 import java.util.Date;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.Optional;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -58,26 +56,17 @@ public class MasterRegistryClientTest {
     @InjectMocks
     private MasterRegistryClient masterRegistryClient;
 
-    @Mock
-    private MasterConfig masterConfig;
-
     @Mock
     private RegistryClient registryClient;
 
-    @Mock
-    private ScheduledExecutorService heartBeatExecutor;
-
     @Mock
     private ProcessService processService;
 
-    @Mock
-    private MasterConnectStrategy masterConnectStrategy;
-
     @Mock
     private MasterHeartBeatTask masterHeartBeatTask;
 
     @Mock
-    private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager;
+    private MasterConfig masterConfig;
 
     @Before
     public void before() throws Exception {
@@ -105,7 +94,7 @@ public class MasterRegistryClientTest {
         taskInstance.setHost("127.0.0.1:8080");
         given(processService.queryNeedFailoverTaskInstances(Mockito.anyString()))
                 .willReturn(Arrays.asList(taskInstance));
-        given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance);
+        given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(Optional.of(processInstance));
         given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any())).willReturn(true);
         Server server = new Server();
         server.setHost("127.0.0.1");
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index e0c3c94714..27be919ff9 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.service;
 
 import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
-
 import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.doNothing;
 
@@ -38,12 +37,14 @@ import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.log.LogClient;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
+import java.util.Optional;
 
 import org.junit.Assert;
 import org.junit.Before;
@@ -89,6 +90,9 @@ public class FailoverServiceTest {
     @Mock
     private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
 
+    @Mock
+    private LogClient logClient;
+
     private static int masterPort = 5678;
     private static int workerPort = 1234;
 
@@ -106,17 +110,20 @@ public class FailoverServiceTest {
         springApplicationContext.setApplicationContext(applicationContext);
 
         given(masterConfig.getListenPort()).willReturn(masterPort);
+        testMasterHost = NetUtils.getAddr(masterConfig.getListenPort());
+        given(masterConfig.getMasterAddress()).willReturn(testMasterHost);
         MasterFailoverService masterFailoverService =
-                new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager, processInstanceExecCacheManager);
+                new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager,
+                        processInstanceExecCacheManager, logClient);
         WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
                 masterConfig,
                 processService,
                 workflowExecuteThreadPool,
-                cacheManager);
+                cacheManager,
+                logClient);
 
         failoverService = new FailoverService(masterFailoverService, workerFailoverService);
 
-        testMasterHost = NetUtils.getAddr(masterConfig.getListenPort());
         String ip = testMasterHost.split(":")[0];
         int port = Integer.valueOf(testMasterHost.split(":")[1]);
         Assert.assertEquals(masterPort, port);
@@ -158,7 +165,8 @@ public class FailoverServiceTest {
         doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class));
         given(processService.findValidTaskListByProcessId(Mockito.anyInt()))
                 .willReturn(Lists.newArrayList(masterTaskInstance, workerTaskInstance));
-        given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance);
+        given(processService.findProcessInstanceDetailById(Mockito.anyInt()))
+                .willReturn(Optional.ofNullable(processInstance));
 
         Thread.sleep(1000);
         Server masterServer = new Server();
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
index 6668f2d36c..d47d68fa3d 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
@@ -17,6 +17,17 @@
 
 package org.apache.dolphinscheduler.remote;
 
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
 import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
 import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
 import org.apache.dolphinscheduler.remote.command.Command;
@@ -35,6 +46,8 @@ import org.apache.dolphinscheduler.remote.utils.Constants;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.remote.utils.NettyUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.ConcurrentHashMap;
@@ -43,112 +56,48 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.timeout.IdleStateHandler;
-
-/**
- * remoting netty client
- */
-public class NettyRemotingClient {
+public class NettyRemotingClient implements AutoCloseable {
 
     private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
 
-    /**
-     * client bootstrap
-     */
     private final Bootstrap bootstrap = new Bootstrap();
 
-    /**
-     * encoder
-     */
     private final NettyEncoder encoder = new NettyEncoder();
 
-    /**
-     * channels
-     */
     private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap<>(128);
 
-    /**
-     * started flag
-     */
     private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
-    /**
-     * worker group
-     */
     private final EventLoopGroup workerGroup;
 
-    /**
-     * client config
-     */
     private final NettyClientConfig clientConfig;
 
-    /**
-     * saync semaphore
-     */
     private final Semaphore asyncSemaphore = new Semaphore(200, true);
 
-    /**
-     * callback thread executor
-     */
     private final ExecutorService callbackExecutor;
 
-    /**
-     * client handler
-     */
     private final NettyClientHandler clientHandler;
 
-    /**
-     * response future executor
-     */
     private final ScheduledExecutorService responseFutureExecutor;
 
-    /**
-     * client init
-     *
-     * @param clientConfig client config
-     */
     public NettyRemotingClient(final NettyClientConfig clientConfig) {
         this.clientConfig = clientConfig;
         if (Epoll.isAvailable()) {
-            this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
-                private final AtomicInteger threadIndex = new AtomicInteger(0);
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
-                }
-            });
+            this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient"));
         } else {
-            this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
-                private final AtomicInteger threadIndex = new AtomicInteger(0);
-
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
-                }
-            });
+            this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient"));
         }
-        this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
-                new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
+        this.callbackExecutor = new ThreadPoolExecutor(
+                Constants.CPUS,
+                Constants.CPUS,
+                1,
+                TimeUnit.MINUTES,
+                new LinkedBlockingQueue<>(1000),
+                new NamedThreadFactory("CallbackExecutor"),
                 new CallerThreadExecutePolicy());
         this.clientHandler = new NettyClientHandler(this, callbackExecutor);
 
@@ -157,9 +106,6 @@ public class NettyRemotingClient {
         this.start();
     }
 
-    /**
-     * start
-     */
     private void start() {
 
         this.bootstrap
@@ -371,9 +317,7 @@ public class NettyRemotingClient {
         return null;
     }
 
-    /**
-     * close
-     */
+    @Override
     public void close() {
         if (isStarted.compareAndSet(true, false)) {
             try {
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
index 739cbbebe1..18ee88f696 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
@@ -17,88 +17,52 @@
 
 package org.apache.dolphinscheduler.remote.config;
 
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 import org.apache.dolphinscheduler.remote.utils.Constants;
 
-/**
- * netty client config
- */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
 public class NettyClientConfig {
 
     /**
      * worker threads,default get machine cpus
      */
+    @Builder.Default
     private int workerThreads = Constants.CPUS;
 
     /**
      * whether tpc delay
      */
+    @Builder.Default
     private boolean tcpNoDelay = true;
 
     /**
      * whether keep alive
      */
+    @Builder.Default
     private boolean soKeepalive = true;
 
     /**
      * send buffer size
      */
+    @Builder.Default
     private int sendBufferSize = 65535;
 
     /**
      * receive buffer size
      */
+    @Builder.Default
     private int receiveBufferSize = 65535;
 
     /**
      * connect timeout millis
      */
+    @Builder.Default
     private int connectTimeoutMillis = 3000;
 
-    public int getWorkerThreads() {
-        return workerThreads;
-    }
-
-    public void setWorkerThreads(int workerThreads) {
-        this.workerThreads = workerThreads;
-    }
-
-    public boolean isTcpNoDelay() {
-        return tcpNoDelay;
-    }
-
-    public void setTcpNoDelay(boolean tcpNoDelay) {
-        this.tcpNoDelay = tcpNoDelay;
-    }
-
-    public boolean isSoKeepalive() {
-        return soKeepalive;
-    }
-
-    public void setSoKeepalive(boolean soKeepalive) {
-        this.soKeepalive = soKeepalive;
-    }
-
-    public int getSendBufferSize() {
-        return sendBufferSize;
-    }
-
-    public void setSendBufferSize(int sendBufferSize) {
-        this.sendBufferSize = sendBufferSize;
-    }
-
-    public int getReceiveBufferSize() {
-        return receiveBufferSize;
-    }
-
-    public void setReceiveBufferSize(int receiveBufferSize) {
-        this.receiveBufferSize = receiveBufferSize;
-    }
-
-    public int getConnectTimeoutMillis() {
-        return connectTimeoutMillis;
-    }
-
-    public void setConnectTimeoutMillis(int connectTimeoutMillis) {
-        this.connectTimeoutMillis = connectTimeoutMillis;
-    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index 4392d6e14a..a8361fa4ed 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -30,10 +30,11 @@ import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.service.log.LogClientService;
+import org.apache.dolphinscheduler.service.log.LogClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.File;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -186,17 +187,15 @@ public class ProcessUtils {
      * @param taskExecutionContext taskExecutionContext
      * @return yarn application ids
      */
-    public static List<String> killYarnJob(@NonNull TaskExecutionContext taskExecutionContext) {
+    public static @Nullable List<String> killYarnJob(@NonNull LogClient logClient,
+                                                     @NonNull TaskExecutionContext taskExecutionContext) {
         if (taskExecutionContext.getLogPath() == null) {
             return Collections.emptyList();
         }
         try {
             Thread.sleep(Constants.SLEEP_TIME_MILLIS);
-            List<String> appIds;
-            try (LogClientService logClient = new LogClientService()) {
-                Host host = Host.of(taskExecutionContext.getHost());
-                appIds = logClient.getAppIds(host.getIp(), host.getPort(), taskExecutionContext.getLogPath());
-            }
+            Host host = Host.of(taskExecutionContext.getHost());
+            List<String> appIds = logClient.getAppIds(host.getIp(), host.getPort(), taskExecutionContext.getLogPath());
             if (CollectionUtils.isNotEmpty(appIds)) {
                 if (StringUtils.isEmpty(taskExecutionContext.getExecutePath())) {
                     taskExecutionContext
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java
similarity index 57%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java
index 735fbab7c4..611bb49b67 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java
@@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
 import org.apache.dolphinscheduler.remote.utils.Host;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import javax.annotation.Nullable;
@@ -46,43 +45,23 @@ import lombok.NonNull;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
 
-/**
- * log client
- */
-public class LogClientService implements AutoCloseable {
+@Service
+public class LogClient implements AutoCloseable {
 
-    private static final Logger logger = LoggerFactory.getLogger(LogClientService.class);
+    private static final Logger logger = LoggerFactory.getLogger(LogClient.class);
 
-    private final NettyClientConfig clientConfig;
+    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
 
     private final NettyRemotingClient client;
 
-    private volatile boolean isRunning;
-
-    /**
-     * request time out
-     */
     private static final long LOG_REQUEST_TIMEOUT = 10 * 1000L;
 
-    /**
-     * construct client
-     */
-    public LogClientService() {
-        this.clientConfig = new NettyClientConfig();
-        this.clientConfig.setWorkerThreads(4);
-        this.client = new NettyRemotingClient(clientConfig);
-        this.isRunning = true;
-    }
-
-    /**
-     * close
-     */
-    @Override
-    public void close() {
-        this.client.close();
-        this.isRunning = false;
-        logger.info("logger client closed");
+    public LogClient() {
+        NettyClientConfig nettyClientConfig = new NettyClientConfig();
+        this.client = new NettyRemotingClient(nettyClientConfig);
+        logger.info("Initialized LogClientService with config: {}", nettyClientConfig);
     }
 
     /**
@@ -96,25 +75,30 @@ public class LogClientService implements AutoCloseable {
      * @return log content
      */
     public String rollViewLog(String host, int port, String path, int skipLineNum, int limit) {
-        logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path,
+        logger.info("Roll view log from host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path,
                 skipLineNum, limit);
         RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
-        String result = "";
         final Host address = new Host(host, port);
         try {
             Command command = request.convert2Command();
-            Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
+            Command response = client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
             if (response != null) {
-                RollViewLogResponseCommand rollReviewLog = JSONUtils.parseObject(
-                        response.getBody(), RollViewLogResponseCommand.class);
+                RollViewLogResponseCommand rollReviewLog =
+                        JSONUtils.parseObject(response.getBody(), RollViewLogResponseCommand.class);
                 return rollReviewLog.getMsg();
             }
+            return "Roll view log response is null";
+        } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            logger.error(
+                    "Roll view log from host : {}, port : {}, path {}, skipLineNum {} ,limit {} error, the current thread has been interrupted",
+                    host, port, path, skipLineNum, limit, ex);
+            return "Roll view log error: " + ex.getMessage();
         } catch (Exception e) {
-            logger.error("roll view log error", e);
-        } finally {
-            this.client.closeChannel(address);
+            logger.error("Roll view log from host : {}, port : {}, path {}, skipLineNum {} ,limit {} error", host, port,
+                    path, skipLineNum, limit, e);
+            return "Roll view log error: " + e.getMessage();
         }
-        return result;
     }
 
     /**
@@ -126,28 +110,31 @@ public class LogClientService implements AutoCloseable {
      * @return log content
      */
     public String viewLog(String host, int port, String path) {
-        logger.info("view log path {}", path);
+        logger.info("View log from host: {}, port: {}, logPath: {}", host, port, path);
         ViewLogRequestCommand request = new ViewLogRequestCommand(path);
-        String result = "";
         final Host address = new Host(host, port);
         try {
             if (NetUtils.getHost().equals(host)) {
-                result = LoggerUtils.readWholeFileContent(request.getPath());
+                return LoggerUtils.readWholeFileContent(request.getPath());
             } else {
                 Command command = request.convert2Command();
                 Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
                 if (response != null) {
-                    ViewLogResponseCommand viewLog = JSONUtils.parseObject(
-                            response.getBody(), ViewLogResponseCommand.class);
-                    result = viewLog.getMsg();
+                    ViewLogResponseCommand viewLog =
+                            JSONUtils.parseObject(response.getBody(), ViewLogResponseCommand.class);
+                    return viewLog.getMsg();
                 }
+                return "View log response is null";
             }
+        } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            logger.error("View log from host: {}, port: {}, logPath: {} error, the current thread has been interrupted",
+                    host, port, path, ex);
+            return "View log error: " + ex.getMessage();
         } catch (Exception e) {
-            logger.error("view log error", e);
-        } finally {
-            this.client.closeChannel(address);
+            logger.error("View log from host: {}, port: {}, logPath: {} error", host, port, path, e);
+            return "View log error: " + e.getMessage();
         }
-        return result;
     }
 
     /**
@@ -159,23 +146,28 @@ public class LogClientService implements AutoCloseable {
      * @return log content bytes
      */
     public byte[] getLogBytes(String host, int port, String path) {
-        logger.info("log path {}", path);
+        logger.info("Get log bytes from host: {}, port: {}, logPath {}", host, port, path);
         GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path);
         final Host address = new Host(host, port);
         try {
             Command command = request.convert2Command();
             Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
             if (response != null) {
-                GetLogBytesResponseCommand getLog = JSONUtils.parseObject(
-                        response.getBody(), GetLogBytesResponseCommand.class);
-                return getLog.getData() == null ? new byte[0] : getLog.getData();
+                GetLogBytesResponseCommand getLog =
+                        JSONUtils.parseObject(response.getBody(), GetLogBytesResponseCommand.class);
+                return getLog.getData() == null ? EMPTY_BYTE_ARRAY : getLog.getData();
             }
+            return EMPTY_BYTE_ARRAY;
+        } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            logger.error(
+                    "Get logSize from host: {}, port: {}, logPath: {} error, the current thread has been interrupted",
+                    host, port, path, ex);
+            return EMPTY_BYTE_ARRAY;
         } catch (Exception e) {
-            logger.error("get log size error", e);
-        } finally {
-            this.client.closeChannel(address);
+            logger.error("Get logSize from host: {}, port: {}, logPath: {} error", host, port, path, e);
+            return EMPTY_BYTE_ARRAY;
         }
-        return new byte[0];
     }
 
     /**
@@ -187,24 +179,28 @@ public class LogClientService implements AutoCloseable {
      * @return remove task status
      */
     public Boolean removeTaskLog(String host, int port, String path) {
-        logger.info("log path {}", path);
+        logger.info("Remove task log from host: {}, port: {}, logPath {}", host, port, path);
         RemoveTaskLogRequestCommand request = new RemoveTaskLogRequestCommand(path);
-        Boolean result = false;
         final Host address = new Host(host, port);
         try {
             Command command = request.convert2Command();
             Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
             if (response != null) {
-                RemoveTaskLogResponseCommand taskLogResponse = JSONUtils.parseObject(
-                        response.getBody(), RemoveTaskLogResponseCommand.class);
+                RemoveTaskLogResponseCommand taskLogResponse =
+                        JSONUtils.parseObject(response.getBody(), RemoveTaskLogResponseCommand.class);
                 return taskLogResponse.getStatus();
             }
+            return false;
+        } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            logger.error(
+                    "Remove task log from host: {}, port: {} logPath: {} error, the current thread has been interrupted",
+                    host, port, path, ex);
+            return false;
         } catch (Exception e) {
-            logger.error("remove task log error", e);
-        } finally {
-            this.client.closeChannel(address);
+            logger.error("Remove task log from host: {}, port: {} logPath: {} error", host, port, path, e);
+            return false;
         }
-        return result;
     }
 
     public @Nullable List<String> getAppIds(@NonNull String host, int port,
@@ -212,26 +208,25 @@ public class LogClientService implements AutoCloseable {
         logger.info("Begin to get appIds from worker: {}:{} taskLogPath: {}", host, port, taskLogFilePath);
         final Host workerAddress = new Host(host, port);
         List<String> appIds = null;
-        try {
-            if (NetUtils.getHost().equals(host)) {
-                appIds = new ArrayList<>(LogUtils.getAppIdsFromLogFile(taskLogFilePath));
-            } else {
-                final Command command = new GetAppIdRequestCommand(taskLogFilePath).convert2Command();
-                Command response = this.client.sendSync(workerAddress, command, LOG_REQUEST_TIMEOUT);
-                if (response != null) {
-                    GetAppIdResponseCommand responseCommand =
-                            JSONUtils.parseObject(response.getBody(), GetAppIdResponseCommand.class);
-                    appIds = responseCommand.getAppIds();
-                }
+        if (NetUtils.getHost().equals(host)) {
+            appIds = LogUtils.getAppIdsFromLogFile(taskLogFilePath);
+        } else {
+            final Command command = new GetAppIdRequestCommand(taskLogFilePath).convert2Command();
+            Command response = this.client.sendSync(workerAddress, command, LOG_REQUEST_TIMEOUT);
+            if (response != null) {
+                GetAppIdResponseCommand responseCommand =
+                        JSONUtils.parseObject(response.getBody(), GetAppIdResponseCommand.class);
+                appIds = responseCommand.getAppIds();
             }
-        } finally {
-            client.closeChannel(workerAddress);
         }
         logger.info("Get appIds: {} from worker: {}:{} taskLogPath: {}", appIds, host, port, taskLogFilePath);
         return appIds;
     }
 
-    public boolean isRunning() {
-        return isRunning;
+    @Override
+    public void close() {
+        this.client.close();
+        logger.info("LogClientService closed");
     }
+
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
deleted file mode 100644
index f3c1078f07..0000000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.log;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- *  log asyc callback
- */
-public class LogPromise {
-
-    private static final ConcurrentHashMap<Long, LogPromise> PROMISES = new ConcurrentHashMap<>();
-
-    /**
-     *  request unique identification
-     */
-    private long opaque;
-
-    /**
-     *  start timemillis
-     */
-    private final long start;
-
-    /**
-     *  timeout
-     */
-    private final long timeout;
-
-    /**
-     *  latch
-     */
-    private final CountDownLatch latch;
-
-    /**
-     *  result
-     */
-    private Object result;
-
-    public LogPromise(long opaque, long timeout) {
-        this.opaque = opaque;
-        this.timeout = timeout;
-        this.start = System.currentTimeMillis();
-        this.latch = new CountDownLatch(1);
-        PROMISES.put(opaque, this);
-    }
-
-    /**
-     *  notify client finish
-     * @param opaque unique identification
-     * @param result result
-     */
-    public static void notify(long opaque, Object result) {
-        LogPromise promise = PROMISES.remove(opaque);
-        if (promise != null) {
-            promise.doCountDown(result);
-        }
-    }
-
-    /**
-     *  countdown
-     *
-     * @param result result
-     */
-    private void doCountDown(Object result) {
-        this.result = result;
-        this.latch.countDown();
-    }
-
-    /**
-     *  whether timeout
-     * @return timeout
-     */
-    public boolean isTimeout() {
-        return System.currentTimeMillis() - start > timeout;
-    }
-
-    /**
-     *  get result
-     * @return
-     */
-    public Object getResult() {
-        try {
-            latch.await(timeout, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException ignore) {
-            Thread.currentThread().interrupt();
-        }
-        PROMISES.remove(opaque);
-        return this.result;
-    }
-
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index a94b01e609..e11ac7820e 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -58,6 +58,7 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import org.springframework.transaction.annotation.Transactional;
 
@@ -77,7 +78,7 @@ public interface ProcessService {
 
     boolean verifyIsNeedCreateCommand(Command command);
 
-    ProcessInstance findProcessInstanceDetailById(int processId);
+    Optional<ProcessInstance> findProcessInstanceDetailById(int processId);
 
     List<TaskDefinition> getTaskNodeListByDefinition(long defineCode);
 
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 513f02f161..c213346451 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -17,21 +17,14 @@
 
 package org.apache.dolphinscheduler.service.process;
 
-import static java.util.stream.Collectors.toSet;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
-import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
-
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import io.micrometer.core.annotation.Counted;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -133,13 +126,17 @@ import org.apache.dolphinscheduler.service.cron.CronUtils;
 import org.apache.dolphinscheduler.service.exceptions.CronParseException;
 import org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.service.expand.CuringParamsService;
-import org.apache.dolphinscheduler.service.log.LogClientService;
+import org.apache.dolphinscheduler.service.log.LogClient;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
 
-import org.apache.commons.collections.CollectionUtils;
-
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
@@ -150,22 +147,24 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
-
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import io.micrometer.core.annotation.Counted;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
 
 /**
  * process relative dao that some mappers in this.
@@ -278,6 +277,9 @@ public class ProcessServiceImpl implements ProcessService {
     @Autowired
     private CuringParamsService curingGlobalParamsService;
 
+    @Autowired
+    private LogClient logClient;
+
     /**
      * handle Command (construct ProcessInstance from Command) , wrapped in transaction
      *
@@ -474,8 +476,8 @@ public class ProcessServiceImpl implements ProcessService {
      * @return process instance
      */
     @Override
-    public ProcessInstance findProcessInstanceDetailById(int processId) {
-        return processInstanceMapper.queryDetailById(processId);
+    public Optional<ProcessInstance> findProcessInstanceDetailById(int processId) {
+        return Optional.ofNullable(processInstanceMapper.queryDetailById(processId));
     }
 
     /**
@@ -597,16 +599,14 @@ public class ProcessServiceImpl implements ProcessService {
         if (CollectionUtils.isEmpty(taskInstanceList)) {
             return;
         }
-        try (LogClientService logClient = new LogClientService()) {
-            for (TaskInstance taskInstance : taskInstanceList) {
-                String taskLogPath = taskInstance.getLogPath();
-                if (Strings.isNullOrEmpty(taskInstance.getHost())) {
-                    continue;
-                }
-                Host host = Host.of(taskInstance.getHost());
-                // remove task log from loggerserver
-                logClient.removeTaskLog(host.getIp(), host.getPort(), taskLogPath);
+        for (TaskInstance taskInstance : taskInstanceList) {
+            String taskLogPath = taskInstance.getLogPath();
+            if (Strings.isNullOrEmpty(taskInstance.getHost())) {
+                continue;
             }
+            Host host = Host.of(taskInstance.getHost());
+            // remove task log from loggerserver
+            logClient.removeTaskLog(host.getIp(), host.getPort(), taskLogPath);
         }
     }
 
@@ -749,7 +749,7 @@ public class ProcessServiceImpl implements ProcessService {
      */
     private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
                                                        Command command,
-                                                       Map<String, String> cmdParam) throws CodeGenerateException {
+                                                       Map<String, String> cmdParam) {
         ProcessInstance processInstance = new ProcessInstance(processDefinition);
         processInstance.setProcessDefinitionCode(processDefinition.getCode());
         processInstance.setProcessDefinitionVersion(processDefinition.getVersion());
@@ -912,8 +912,8 @@ public class ProcessServiceImpl implements ProcessService {
      * @param host    host
      * @return process instance
      */
-    protected ProcessInstance constructProcessInstance(Command command,
-                                                       String host) throws CronParseException, CodeGenerateException {
+    protected @Nullable ProcessInstance constructProcessInstance(Command command,
+                                                                 String host) throws CronParseException, CodeGenerateException {
         ProcessInstance processInstance;
         ProcessDefinition processDefinition;
         CommandType commandType = command.getCommandType();
@@ -929,7 +929,7 @@ public class ProcessServiceImpl implements ProcessService {
         if (processInstanceId == 0) {
             processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
         } else {
-            processInstance = this.findProcessInstanceDetailById(processInstanceId);
+            processInstance = this.findProcessInstanceDetailById(processInstanceId).orElse(null);
             if (processInstance == null) {
                 return null;
             }
@@ -1068,7 +1068,8 @@ public class ProcessServiceImpl implements ProcessService {
      *
      * @return ProcessDefinition
      */
-    private ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode, Map<String, String> cmdParam) {
+    private @Nullable ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode,
+                                                                      Map<String, String> cmdParam) {
         if (cmdParam != null) {
             int processInstanceId = 0;
             if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) {
@@ -1080,7 +1081,7 @@ public class ProcessServiceImpl implements ProcessService {
             }
 
             if (processInstanceId != 0) {
-                ProcessInstance processInstance = this.findProcessInstanceDetailById(processInstanceId);
+                ProcessInstance processInstance = this.findProcessInstanceDetailById(processInstanceId).orElse(null);
                 if (processInstance == null) {
                     return null;
                 }
@@ -1174,7 +1175,8 @@ public class ProcessServiceImpl implements ProcessService {
         // copy parent instance user def params to sub process..
         String parentInstanceId = paramMap.get(CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID);
         if (!Strings.isNullOrEmpty(parentInstanceId)) {
-            ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
+            ProcessInstance parentInstance =
+                    findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)).orElse(null);
             if (parentInstance != null) {
                 subProcessInstance.setGlobalParams(
                         joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
@@ -3146,7 +3148,7 @@ public class ProcessServiceImpl implements ProcessService {
         if (task == null) {
             return;
         }
-        ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId());
+        ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId()).orElse(null);
         if (processInstance != null
                 && (processInstance.getState().isFailure() || processInstance.getState().isStop())) {
             List<TaskInstance> validTaskList = findValidTaskListByProcessId(processInstance.getId());
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java
similarity index 82%
rename from dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java
rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java
index 6a6fb4c3bd..c0b6001303 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java
@@ -40,8 +40,8 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({LogClientService.class, NetUtils.class, LoggerUtils.class, NettyRemotingClient.class})
-public class LogClientServiceTest {
+@PrepareForTest({LogClient.class, NetUtils.class, LoggerUtils.class, NettyRemotingClient.class})
+public class LogClientTest {
 
     @Test
     public void testViewLogFromLocal() {
@@ -54,8 +54,8 @@ public class LogClientServiceTest {
         PowerMockito.mockStatic(LoggerUtils.class);
         PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("application_xx_11");
 
-        LogClientService logClientService = new LogClientService();
-        String log = logClientService.viewLog(localMachine, port, path);
+        LogClient logClient = new LogClient();
+        String log = logClient.viewLog(localMachine, port, path);
         Assert.assertNotNull(log);
     }
 
@@ -75,8 +75,8 @@ public class LogClientServiceTest {
         command.setBody(JSONUtils.toJsonString(new ViewLogResponseCommand("")).getBytes(StandardCharsets.UTF_8));
         PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
                 .thenReturn(command);
-        LogClientService logClientService = new LogClientService();
-        String log = logClientService.viewLog(localMachine, port, path);
+        LogClient logClient = new LogClient();
+        String log = logClient.viewLog(localMachine, port, path);
         Assert.assertNotNull(log);
     }
 
@@ -86,8 +86,8 @@ public class LogClientServiceTest {
         PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
         PowerMockito.doNothing().when(remotingClient).close();
 
-        LogClientService logClientService = new LogClientService();
-        logClientService.close();
+        LogClient logClient = new LogClient();
+        logClient.close();
     }
 
     @Test
@@ -100,8 +100,8 @@ public class LogClientServiceTest {
         PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
                 .thenReturn(command);
 
-        LogClientService logClientService = new LogClientService();
-        String msg = logClientService.rollViewLog("localhost", 1234, "/tmp/log", 0, 10);
+        LogClient logClient = new LogClient();
+        String msg = logClient.rollViewLog("localhost", 1234, "/tmp/log", 0, 10);
         Assert.assertNotNull(msg);
     }
 
@@ -115,8 +115,8 @@ public class LogClientServiceTest {
         PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
                 .thenReturn(command);
 
-        LogClientService logClientService = new LogClientService();
-        byte[] logBytes = logClientService.getLogBytes("localhost", 1234, "/tmp/log");
+        LogClient logClient = new LogClient();
+        byte[] logBytes = logClient.getLogBytes("localhost", 1234, "/tmp/log");
         Assert.assertNotNull(logBytes);
     }
 
@@ -130,14 +130,9 @@ public class LogClientServiceTest {
         PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
                 .thenReturn(command);
 
-        LogClientService logClientService = new LogClientService();
-        Boolean status = logClientService.removeTaskLog("localhost", 1234, "/log/path");
+        LogClient logClient = new LogClient();
+        Boolean status = logClient.removeTaskLog("localhost", 1234, "/log/path");
         Assert.assertTrue(status);
     }
 
-    @Test
-    public void testIsRunning() {
-        LogClientService logClientService = new LogClientService();
-        Assert.assertTrue(logClientService.isRunning());
-    }
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractRemoteTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractRemoteTask.java
index 9ee702fd20..5be423d8c6 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractRemoteTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractRemoteTask.java
@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.api;
 import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
-import java.util.Set;
+import java.util.List;
 
 public abstract class AbstractRemoteTask extends AbstractTask {
 
@@ -38,7 +38,7 @@ public abstract class AbstractRemoteTask extends AbstractTask {
         this.cancelApplication();
     }
 
-    public abstract Set<String> getApplicationIds() throws TaskException;
+    public abstract List<String> getApplicationIds() throws TaskException;
 
     public abstract void cancelApplication() throws TaskException;
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
index e51429692f..5e3ec8fab2 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
@@ -21,8 +21,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 
-import java.util.Set;
-import java.util.regex.Matcher;
+import java.util.List;
 import java.util.regex.Pattern;
 
 /**
@@ -106,24 +105,11 @@ public abstract class AbstractYarnTask extends AbstractRemoteTask {
      * @return
      * @throws TaskException
      */
-    public Set<String> getApplicationIds() throws TaskException {
+    @Override
+    public List<String> getApplicationIds() throws TaskException {
         return LogUtils.getAppIdsFromLogFile(taskRequest.getLogPath(), logger);
     }
 
-    /**
-     * find app id
-     *
-     * @param line line
-     * @return appid
-     */
-    protected String findAppId(String line) {
-        Matcher matcher = YARN_APPLICATION_REGEX.matcher(line);
-        if (matcher.find()) {
-            return matcher.group();
-        }
-        return null;
-    }
-
     /**
      * create command
      *
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
index 0df5c3a911..c3833c071a 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
@@ -23,8 +23,10 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -42,14 +44,14 @@ public class LogUtils {
 
     private static final Pattern APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX);
 
-    public Set<String> getAppIdsFromLogFile(@NonNull String logPath) {
+    public List<String> getAppIdsFromLogFile(@NonNull String logPath) {
         return getAppIdsFromLogFile(logPath, log);
     }
 
-    public Set<String> getAppIdsFromLogFile(@NonNull String logPath, Logger logger) {
+    public List<String> getAppIdsFromLogFile(@NonNull String logPath, Logger logger) {
         File logFile = new File(logPath);
         if (!logFile.exists() || !logFile.isFile()) {
-            return Collections.emptySet();
+            return Collections.emptyList();
         }
         Set<String> appIds = new HashSet<>();
         try (Stream<String> stream = Files.lines(Paths.get(logPath))) {
@@ -65,10 +67,10 @@ public class LogUtils {
                     }
                 }
             });
-            return appIds;
+            return new ArrayList<>(appIds);
         } catch (IOException e) {
             logger.error("Get appId from log file erro, logPath: {}", logPath, e);
-            return Collections.emptySet();
+            return Collections.emptyList();
         }
     }
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java
index bd961d3137..d72a9737bf 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java
@@ -17,12 +17,12 @@
 
 package org.apache.dolphinscheduler.plugin.task.api.utils;
 
-import java.util.Set;
+import java.util.List;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.google.common.collect.Sets;
+import com.google.common.collect.Lists;
 
 public class LogUtilsTest {
 
@@ -31,7 +31,7 @@ public class LogUtilsTest {
 
     @Test
     public void getAppIdsFromLogFile() {
-        Set<String> appIds = LogUtils.getAppIdsFromLogFile(APP_ID_FILE);
-        Assert.assertEquals(Sets.newHashSet("application_1548381669007_1234"), appIds);
+        List<String> appIds = LogUtils.getAppIdsFromLogFile(APP_ID_FILE);
+        Assert.assertEquals(Lists.newArrayList("application_1548381669007_1234"), appIds);
     }
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
index 51751fe10d..11269d3ac7 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
@@ -17,13 +17,9 @@
 
 package org.apache.dolphinscheduler.plugin.task.dinky;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.MissingNode;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
 
 import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@@ -31,6 +27,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
 import org.apache.http.client.HttpClient;
@@ -45,10 +42,13 @@ import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.MissingNode;
 
 public class DinkyTask extends AbstractRemoteTask {
 
@@ -73,8 +73,8 @@ public class DinkyTask extends AbstractRemoteTask {
     }
 
     @Override
-    public Set<String> getApplicationIds() throws TaskException {
-        return Collections.emptySet();
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
     }
 
     @Override
@@ -112,20 +112,23 @@ public class DinkyTask extends AbstractRemoteTask {
                     if (!checkResult(jobInstanceInfoResult)) {
                         break;
                     }
-                    String jobInstanceStatus = jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText();
+                    String jobInstanceStatus =
+                            jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText();
                     switch (jobInstanceStatus) {
                         case DinkyTaskConstants.STATUS_FINISHED:
                             final int exitStatusCode = mapStatusToExitCode(status);
                             // Use address-taskId as app id
                             setAppIds(String.format("%s-%s", address, taskId));
                             setExitStatusCode(exitStatusCode);
-                            logger.info("dinky task finished with results: {}", result.get(DinkyTaskConstants.API_RESULT_DATAS));
+                            logger.info("dinky task finished with results: {}",
+                                    result.get(DinkyTaskConstants.API_RESULT_DATAS));
                             finishFlag = true;
                             break;
                         case DinkyTaskConstants.STATUS_FAILED:
                         case DinkyTaskConstants.STATUS_CANCELED:
                         case DinkyTaskConstants.STATUS_UNKNOWN:
-                            errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error").asText());
+                            errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error")
+                                    .asText());
                             finishFlag = true;
                             break;
                         default:
@@ -191,14 +194,14 @@ public class DinkyTask extends AbstractRemoteTask {
         String address = this.dinkyParameters.getAddress();
         String taskId = this.dinkyParameters.getTaskId();
         logger.info("trying terminate dinky task, taskId: {}, address: {}, taskId: {}",
-            this.taskExecutionContext.getTaskInstanceId(),
-            address,
-            taskId);
+                this.taskExecutionContext.getTaskInstanceId(),
+                address,
+                taskId);
         cancelTask(address, taskId);
         logger.warn("dinky task terminated, taskId: {}, address: {}, taskId: {}",
-            this.taskExecutionContext.getTaskInstanceId(),
-            address,
-            taskId);
+                this.taskExecutionContext.getTaskInstanceId(),
+                address,
+                taskId);
     }
 
     private JsonNode submitTask(String address, String taskId) {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
index a746185805..0b44b7c894 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
@@ -17,6 +17,15 @@
 
 package org.apache.dolphinscheduler.plugin.task.emr;
 
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import com.amazonaws.SdkBaseException;
 import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
 import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
@@ -31,16 +40,6 @@ import com.amazonaws.services.elasticmapreduce.model.StepStatus;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.Sets;
 
-import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
-import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
-import org.apache.dolphinscheduler.plugin.task.api.TaskException;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 /**
  * AddJobFlowSteps task executor
  *
@@ -51,10 +50,9 @@ public class EmrAddStepsTask extends AbstractEmrTask {
     private String stepId;
 
     private final HashSet<String> waitingStateSet = Sets.newHashSet(
-        StepState.PENDING.toString(),
-        StepState.CANCEL_PENDING.toString(),
-        StepState.RUNNING.toString()
-    );
+            StepState.PENDING.toString(),
+            StepState.CANCEL_PENDING.toString(),
+            StepState.RUNNING.toString());
 
     /**
      * constructor
@@ -66,8 +64,8 @@ public class EmrAddStepsTask extends AbstractEmrTask {
     }
 
     @Override
-    public Set<String> getApplicationIds() throws TaskException {
-        return Collections.emptySet();
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
     }
 
     @Override
@@ -126,13 +124,16 @@ public class EmrAddStepsTask extends AbstractEmrTask {
 
         final AddJobFlowStepsRequest addJobFlowStepsRequest;
         try {
-            addJobFlowStepsRequest = objectMapper.readValue(emrParameters.getStepsDefineJson(), AddJobFlowStepsRequest.class);
+            addJobFlowStepsRequest =
+                    objectMapper.readValue(emrParameters.getStepsDefineJson(), AddJobFlowStepsRequest.class);
         } catch (JsonProcessingException e) {
             throw new EmrTaskException("can not parse AddJobFlowStepsRequest from json", e);
         }
 
-        // When a single task definition is associated with multiple steps, the state tracking will have high complexity.
-        // Therefore, A task definition only supports the association of a single step, which can better ensure the reliability of the task state.
+        // When a single task definition is associated with multiple steps, the state tracking will have high
+        // complexity.
+        // Therefore, A task definition only supports the association of a single step, which can better ensure the
+        // reliability of the task state.
         if (addJobFlowStepsRequest.getSteps().size() > 1) {
             throw new EmrTaskException("ds emr addJobFlowStepsTask only support one step");
         }
@@ -178,7 +179,8 @@ public class EmrAddStepsTask extends AbstractEmrTask {
 
     @Override
     public void cancelApplication() throws TaskException {
-        logger.info("trying cancel emr step, taskId:{}, clusterId:{}, stepId:{}", this.taskExecutionContext.getTaskInstanceId(), clusterId, stepId);
+        logger.info("trying cancel emr step, taskId:{}, clusterId:{}, stepId:{}",
+                this.taskExecutionContext.getTaskInstanceId(), clusterId, stepId);
         CancelStepsRequest cancelStepsRequest = new CancelStepsRequest().withClusterId(clusterId).withStepIds(stepId);
         CancelStepsResult cancelStepsResult = emrClient.cancelSteps(cancelStepsRequest);
 
@@ -187,10 +189,10 @@ public class EmrAddStepsTask extends AbstractEmrTask {
         }
 
         CancelStepsInfo cancelEmrStepInfo = cancelStepsResult.getCancelStepsInfoList()
-            .stream()
-            .filter(cancelStepsInfo -> cancelStepsInfo.getStepId().equals(stepId))
-            .findFirst()
-            .orElseThrow(() -> new EmrTaskException("cancel emr step failed"));
+                .stream()
+                .filter(cancelStepsInfo -> cancelStepsInfo.getStepId().equals(stepId))
+                .findFirst()
+                .orElseThrow(() -> new EmrTaskException("cancel emr step failed"));
 
         if (CancelStepsRequestStatus.FAILED.toString().equals(cancelEmrStepInfo.getStatus())) {
             throw new EmrTaskException("cancel emr step failed, message:" + cancelEmrStepInfo.getReason());
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
index 770fb9f996..38a1177555 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
@@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.SdkBaseException;
@@ -57,8 +57,8 @@ public class EmrJobFlowTask extends AbstractEmrTask {
     }
 
     @Override
-    public Set<String> getApplicationIds() throws TaskException {
-        return Collections.emptySet();
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
     }
 
     @Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
index f6d4e56815..7e63db7462 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
@@ -29,11 +29,7 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.commons.collections4.CollectionUtils;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 public class FlinkStreamTask extends FlinkTask implements StreamTask {
 
@@ -99,7 +95,7 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask {
 
     @Override
     public void cancelApplication() throws TaskException {
-        Set<String> appIds = getApplicationIds();
+        List<String> appIds = getApplicationIds();
         if (CollectionUtils.isEmpty(appIds)) {
             logger.error("can not get appId, taskInstanceId:{}", taskExecutionContext.getTaskInstanceId());
             return;
@@ -120,7 +116,7 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask {
 
     @Override
     public void savePoint() throws Exception {
-        Set<String> appIds = getApplicationIds();
+        List<String> appIds = getApplicationIds();
         if (CollectionUtils.isEmpty(appIds)) {
             logger.warn("can not get appId, taskInstanceId:{}", taskExecutionContext.getTaskInstanceId());
             return;
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
index a342b2aac3..397b2bc38c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
@@ -19,30 +19,13 @@ package org.apache.dolphinscheduler.plugin.task.flink;
 
 import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
-import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
-import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
-import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
-import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
+
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -113,34 +96,6 @@ public class FlinkTask extends AbstractYarnTask {
         return flinkParameters;
     }
 
-    @Override
-    public Set<String> getApplicationIds() throws TaskException {
-        Set<String> appIds = new HashSet<>();
-
-        File file = new File(taskRequest.getLogPath());
-        if (!file.exists()) {
-            return appIds;
-        }
-
-        /*
-         * analysis log? get submitted yarn application id
-         */
-        try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(taskRequest.getLogPath()), StandardCharsets.UTF_8))) {
-            String line;
-            while ((line = br.readLine()) != null) {
-                String appId = findAppId(line);
-                if (StringUtils.isNotEmpty(appId)) {
-                    appIds.add(appId);
-                }
-            }
-        } catch (FileNotFoundException e) {
-            throw new TaskException("get application id error, file not found, path:" + taskRequest.getLogPath());
-        } catch (IOException e) {
-            throw new TaskException("get application id error, path:" + taskRequest.getLogPath(), e);
-        }
-        return appIds;
-    }
-
     /**
      * find app id
      *
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
index 06ae0b953e..dc0aa6a357 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.plugin.task.hivecli;
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
 
 import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@@ -39,7 +38,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 public class HiveCliTask extends AbstractRemoteTask {
 
@@ -59,8 +57,8 @@ public class HiveCliTask extends AbstractRemoteTask {
     }
 
     @Override
-    public Set<String> getApplicationIds() throws TaskException {
-        return Collections.emptySet();
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
     }
 
     @Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
index 6587fddc31..a83f4081ff 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
@@ -17,10 +17,7 @@
 
 package org.apache.dolphinscheduler.plugin.task.jupyter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@@ -41,7 +38,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 public class JupyterTask extends AbstractRemoteTask {
 
@@ -66,8 +64,8 @@ public class JupyterTask extends AbstractRemoteTask {
     }
 
     @Override
-    public Set<String> getApplicationIds() throws TaskException {
-        return Collections.emptySet();
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
     }
 
     @Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
index b405ae5f17..691e9abf3a 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
@@ -31,8 +31,8 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 public class K8sTask extends AbstractK8sTask {
 
@@ -59,8 +59,8 @@ public class K8sTask extends AbstractK8sTask {
     }
 
     @Override
-    public Set<String> getApplicationIds() throws TaskException {
-        return Collections.emptySet();
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
     }
 
     @Override
@@ -72,7 +72,7 @@ public class K8sTask extends AbstractK8sTask {
     protected String buildCommand() {
         K8sTaskMainParameters k8sTaskMainParameters = new K8sTaskMainParameters();
         Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
-        Map<String,String> namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace());
+        Map<String, String> namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace());
         String namespaceName = namespace.get(NAMESPACE_NAME);
         String clusterName = namespace.get(CLUSTER);
         k8sTaskMainParameters.setImage(k8sTaskParameters.getImage());
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
index 177122847e..debf913431 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
@@ -17,10 +17,7 @@
 
 package org.apache.dolphinscheduler.plugin.task.pigeon;
 
-import org.apache.commons.collections4.CollectionUtils;
-
 import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@@ -28,6 +25,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.http.HttpEntity;
 import org.apache.http.StatusLine;
 import org.apache.http.client.ClientProtocolException;
@@ -37,10 +36,7 @@ import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
-import org.java_websocket.client.WebSocketClient;
-import org.java_websocket.handshake.ServerHandshake;
 
-import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
@@ -48,9 +44,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.java_websocket.client.WebSocketClient;
+import org.java_websocket.handshake.ServerHandshake;
+
 /**
  * TIS DataX Task
  **/
@@ -70,8 +68,8 @@ public class PigeonTask extends AbstractRemoteTask {
     }
 
     @Override
-    public Set<String> getApplicationIds() throws TaskException {
-        return Collections.emptySet();
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
     }
 
     @Override
@@ -103,9 +101,10 @@ public class PigeonTask extends AbstractRemoteTask {
             ExecResult execState = null;
             int taskId;
             WebSocketClient webSocket = null;
-            try (CloseableHttpClient client = HttpClients.createDefault();
-                 // trigger to start PIGEON dataX task
-                 CloseableHttpResponse response = client.execute(post)) {
+            try (
+                    CloseableHttpClient client = HttpClients.createDefault();
+                    // trigger to start PIGEON dataX task
+                    CloseableHttpResponse response = client.execute(post)) {
                 triggerResult = processResponse(triggerUrl, response, BizResult.class);
                 if (!triggerResult.isSuccess()) {
                     List<String> errormsg = triggerResult.getErrormsg();
@@ -155,7 +154,8 @@ public class PigeonTask extends AbstractRemoteTask {
             long costTime = System.currentTimeMillis() - startTime;
             logger.info("PIGEON task: {},taskId:{} costTime : {} milliseconds, statusCode : {}",
                     targetJobName, taskId, costTime, (execState == ExecResult.SUCCESS) ? "'success'" : "'failure'");
-            setExitStatusCode((execState == ExecResult.SUCCESS) ? TaskConstants.EXIT_CODE_SUCCESS : TaskConstants.EXIT_CODE_FAILURE);
+            setExitStatusCode((execState == ExecResult.SUCCESS) ? TaskConstants.EXIT_CODE_SUCCESS
+                    : TaskConstants.EXIT_CODE_FAILURE);
         } catch (Exception e) {
             logger.error("execute PIGEON dataX faild,PIGEON task name:" + targetJobName, e);
             setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
@@ -187,15 +187,17 @@ public class PigeonTask extends AbstractRemoteTask {
         logger.info("start to cancelApplication taskId:{}", triggerResult.getTaskId());
         final String triggerUrl = getTriggerUrl();
 
-        StringEntity entity = new StringEntity(config.getJobCancelPostBody(triggerResult.getTaskId()), StandardCharsets.UTF_8);
+        StringEntity entity =
+                new StringEntity(config.getJobCancelPostBody(triggerResult.getTaskId()), StandardCharsets.UTF_8);
 
         CancelResult cancelResult = null;
         HttpPost post = new HttpPost(triggerUrl);
         addFormUrlencoded(post);
         post.setEntity(entity);
-        try (CloseableHttpClient client = HttpClients.createDefault();
-             // trigger to start TIS dataX task
-             CloseableHttpResponse response = client.execute(post)) {
+        try (
+                CloseableHttpClient client = HttpClients.createDefault();
+                // trigger to start TIS dataX task
+                CloseableHttpResponse response = client.execute(post)) {
             cancelResult = processResponse(triggerUrl, response, CancelResult.class);
             if (!cancelResult.isSuccess()) {
                 List<String> errormsg = triggerResult.getErrormsg();
@@ -229,6 +231,7 @@ public class PigeonTask extends AbstractRemoteTask {
         final String applyURI = config.getJobLogsFetchUrl(tisHost, dataXName, taskId);
         logger.info("apply ws connection,uri:{}", applyURI);
         WebSocketClient webSocketClient = new WebSocketClient(new URI(applyURI)) {
+
             @Override
             public void onOpen(ServerHandshake handshakedata) {
                 logger.info("start to receive remote execute log");
@@ -254,7 +257,8 @@ public class PigeonTask extends AbstractRemoteTask {
         return webSocketClient;
     }
 
-    private <T extends AjaxResult> T processResponse(String applyUrl, CloseableHttpResponse response, Class<T> clazz) throws Exception {
+    private <T extends AjaxResult> T processResponse(String applyUrl, CloseableHttpResponse response,
+                                                     Class<T> clazz) throws Exception {
         StatusLine resStatus = response.getStatusLine();
         if (HttpURLConnection.HTTP_OK != resStatus.getStatusCode()) {
             throw new IllegalStateException("request server " + applyUrl + " faild:" + resStatus.getReasonPhrase());
@@ -272,6 +276,7 @@ public class PigeonTask extends AbstractRemoteTask {
     }
 
     private static class CancelResult extends AjaxResult<Object> {
+
         private Object bizresult;
 
         @Override
@@ -285,6 +290,7 @@ public class PigeonTask extends AbstractRemoteTask {
     }
 
     private static class BizResult extends AjaxResult<TriggerBuildResult> {
+
         private TriggerBuildResult bizresult;
 
         @Override
@@ -302,6 +308,7 @@ public class PigeonTask extends AbstractRemoteTask {
     }
 
     private static class StatusResult extends AjaxResult<Map> {
+
         private Map bizresult;
 
         @Override
@@ -351,6 +358,7 @@ public class PigeonTask extends AbstractRemoteTask {
     }
 
     private static class TriggerBuildResult {
+
         private int taskid;
 
         public int getTaskid() {
@@ -387,6 +395,7 @@ public class PigeonTask extends AbstractRemoteTask {
     }
 
     private static class ExecLog {
+
         private String logType;
         private String msg;
         private int taskId;
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
index 22c898d355..8299f85cd2 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
@@ -34,8 +34,8 @@ import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSStaticCredentialsProvider;
@@ -73,8 +73,8 @@ public class SagemakerTask extends AbstractRemoteTask {
     }
 
     @Override
-    public Set<String> getApplicationIds() throws TaskException {
-        return Collections.emptySet();
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
     }
 
     @Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index b0e8aa5454..3d896adc58 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -17,11 +17,10 @@
 
 package org.apache.dolphinscheduler.plugin.task.seatunnel;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.BooleanUtils;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS;
 
 import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@@ -33,6 +32,9 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.BooleanUtils;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -42,10 +44,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS;
 
 /**
  * seatunnel task
@@ -82,8 +80,8 @@ public class SeatunnelTask extends AbstractRemoteTask {
     }
 
     @Override
-    public Set<String> getApplicationIds() throws TaskException {
-        return Collections.emptySet();
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
     }
 
     @Override
@@ -180,11 +178,13 @@ public class SeatunnelTask extends AbstractRemoteTask {
     }
 
     private String buildConfigFilePath() {
-        return String.format("%s/seatunnel_%s.conf", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
+        return String.format("%s/seatunnel_%s.conf", taskExecutionContext.getExecutePath(),
+                taskExecutionContext.getTaskAppId());
     }
 
     private void createConfigFileIfNotExists(String script, String scriptFile) throws IOException {
-        logger.info("tenantCode :{}, task dir:{}", taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
+        logger.info("tenantCode :{}, task dir:{}", taskExecutionContext.getTenantCode(),
+                taskExecutionContext.getExecutePath());
 
         if (!Files.exists(Paths.get(scriptFile))) {
             logger.info("generate script file:{}", scriptFile);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index 75b6155587..5c0473154e 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -17,11 +17,7 @@
 
 package org.apache.dolphinscheduler.plugin.task.zeppelin;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import kong.unirest.Unirest;
-
 import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@@ -29,6 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.spi.utils.DateUtils;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
 import org.apache.zeppelin.client.ClientConfig;
 import org.apache.zeppelin.client.NoteResult;
 import org.apache.zeppelin.client.ParagraphResult;
@@ -39,7 +36,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+
+import kong.unirest.Unirest;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 public class ZeppelinTask extends AbstractRemoteTask {
 
@@ -235,8 +235,8 @@ public class ZeppelinTask extends AbstractRemoteTask {
     }
 
     @Override
-    public Set<String> getApplicationIds() throws TaskException {
-        return Collections.emptySet();
+    public List<String> getApplicationIds() throws TaskException {
+        return Collections.emptyList();
     }
 
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 927920360a..ed351c3b86 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.worker.processor;
 
+import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
@@ -25,6 +26,7 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@@ -42,12 +44,7 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
-import org.apache.dolphinscheduler.service.log.LogClientService;
-
-import org.apache.commons.collections.CollectionUtils;
-
-
-
+import org.apache.dolphinscheduler.service.log.LogClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -65,15 +62,15 @@ public class TaskKillProcessor implements NettyRequestProcessor {
 
     private final Logger logger = LoggerFactory.getLogger(TaskKillProcessor.class);
 
-    /**
-     * task execute manager
-     */
     @Autowired
     private WorkerManagerThread workerManager;
 
     @Autowired
     private MessageRetryRunner messageRetryRunner;
 
+    @Autowired
+    private LogClient logClient;
+
     /**
      * task kill process
      *
@@ -92,12 +89,14 @@ public class TaskKillProcessor implements NettyRequestProcessor {
         logger.info("task kill command : {}", killCommand);
 
         int taskInstanceId = killCommand.getTaskInstanceId();
-        TaskExecutionContext taskExecutionContext =
-            TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
-        if (taskExecutionContext == null) {
-            logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId());
-            return;
-        }
+        try {
+            LoggerUtils.setTaskInstanceIdMDC(taskInstanceId);
+            TaskExecutionContext taskExecutionContext =
+                    TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
+            if (taskExecutionContext == null) {
+                logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId());
+                return;
+            }
 
         int processId = taskExecutionContext.getProcessId();
         if (processId == 0) {
@@ -110,19 +109,22 @@ public class TaskKillProcessor implements NettyRequestProcessor {
             return;
         }
 
-        // if processId > 0, it should call cancelApplication to cancel remote application too.
-        this.cancelApplication(taskInstanceId);
-        Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
+            // if processId > 0, it should call cancelApplication to cancel remote application too.
+            this.cancelApplication(taskInstanceId);
+            Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
 
         taskExecutionContext.setCurrentExecutionStatus(
             result.getLeft() ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
         taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight()));
         sendTaskKillResponseCommand(channel, taskExecutionContext);
 
-        TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
+            TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+            messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
 
-        logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
+            logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
+        } finally {
+            LoggerUtils.removeTaskInstanceIdMDC();
+        }
     }
 
     private void sendTaskKillResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) {
@@ -228,10 +230,11 @@ public class TaskKillProcessor implements NettyRequestProcessor {
                 host, logPath, executePath, tenantCode);
             return Pair.of(false, Collections.emptyList());
         }
-        try (LogClientService logClient = new LogClientService()) {
+        try {
             logger.info("Get appIds from worker {}:{} taskLogPath: {}", host.getIp(), host.getPort(), logPath);
             List<String> appIds = logClient.getAppIds(host.getIp(), host.getPort(), logPath);
             if (CollectionUtils.isEmpty(appIds)) {
+                logger.info("The appId is empty");
                 return Pair.of(true, Collections.emptyList());
             }
 
@@ -241,7 +244,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
             Thread.currentThread().interrupt();
             logger.error("kill yarn job error, the current thread has been interrtpted", e);
         } catch (Exception e) {
-            logger.error("kill yarn job error", e);
+            logger.error("Kill yarn job error, host: {}, logPath: {}, executePath: {}, tenantCode: {}", host, logPath, executePath, tenantCode, e);
         }
         return Pair.of(false, Collections.emptyList());
     }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
index 275fdf113d..9cb1a1e4f4 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.worker.runner;
 
+import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
 import com.google.common.base.Strings;
 import lombok.NonNull;
 import org.apache.dolphinscheduler.common.Constants;
@@ -35,6 +36,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana
 import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@@ -50,6 +52,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.NoSuchFileException;
 import java.util.Date;
+import java.util.List;
 
 import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
 
@@ -120,7 +123,10 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
         if (task != null) {
             try {
                 task.cancel();
-                ProcessUtils.killYarnJob(taskExecutionContext);
+                List<String> appIds = LogUtils.getAppIdsFromLogFile(taskExecutionContext.getLogPath());
+                if (CollectionUtils.isNotEmpty(appIds)) {
+                    ProcessUtils.cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
+                }
             } catch (Exception e) {
                 logger.error("Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual", e);
             }