You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:28 UTC

[dolphinscheduler] 01/29: Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe (#10479)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 4ceb42087310575ffedd7b09fbf35a08614bb3b7
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Jun 16 21:46:18 2022 +0800

    Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe (#10479)
    
    * Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe
    
    (cherry picked from commit ad2646ff1f7baa5d76d29023ced2c28a89b52f6b)
---
 .../dolphinscheduler/common/utils/FileUtils.java   |  29 +++-
 .../dao/entity/ProcessInstance.java                |  12 +-
 .../master/consumer/TaskPriorityQueueConsumer.java |   8 +-
 .../server/master/processor/queue/TaskEvent.java   | 105 +-----------
 .../processor/queue/TaskExecuteThreadPool.java     |  22 +--
 .../master/runner/WorkflowExecuteThread.java       | 176 +++++++++++++--------
 .../master/runner/WorkflowExecuteThreadPool.java   |   2 +-
 .../master/runner/task/CommonTaskProcessor.java    |   4 +-
 .../master/runner/task/TaskProcessorFactory.java   |  27 +++-
 .../runner/task/TaskProcessorFactoryTest.java      |   4 +-
 .../remote/utils/ChannelUtils.java                 |  12 +-
 .../apache/dolphinscheduler/remote/utils/Host.java |   2 +
 .../service/process/ProcessServiceImpl.java        |   5 +-
 .../queue/PeerTaskInstancePriorityQueue.java       |  35 ++--
 .../queue/PeerTaskInstancePriorityQueueTest.java   |   9 +-
 .../src/main/resources/application.yaml            |   4 +-
 .../plugin/task/shell/ShellTask.java               |   7 +-
 .../server/worker/config/WorkerConfig.java         |   4 +-
 .../worker/processor/TaskCallbackService.java      |   2 +
 .../server/worker/runner/TaskExecuteThread.java    |  24 ++-
 20 files changed, 247 insertions(+), 246 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
index bdcf62f76a..23e4b74b75 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
@@ -17,14 +17,25 @@
 
 package org.apache.dolphinscheduler.common.utils;
 
+import static org.apache.dolphinscheduler.common.Constants.DATA_BASEDIR_PATH;
+import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXES;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXES_DEFAULT_VALUE;
+import static org.apache.dolphinscheduler.common.Constants.UTF_8;
+import static org.apache.dolphinscheduler.common.Constants.YYYYMMDDHHMMSS;
+
 import org.apache.commons.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.NoSuchFileException;
 
-import static org.apache.dolphinscheduler.common.Constants.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * file utils
@@ -112,7 +123,15 @@ public class FileUtils {
         File execLocalPathFile = new File(execLocalPath);
 
         if (execLocalPathFile.exists()) {
-            org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile);
+            try {
+                org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile);
+            } catch (Exception ex) {
+                if (ex instanceof NoSuchFileException || ex.getCause() instanceof NoSuchFileException) {
+                    // this file is already be deleted.
+                } else {
+                    throw ex;
+                }
+            }
         }
 
         //create work dir
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
index 8f639efc07..fb3ae13858 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
@@ -266,13 +266,11 @@ public class ProcessInstance {
      */
     public ProcessInstance(ProcessDefinition processDefinition) {
         this.processDefinition = processDefinition;
-        this.name = processDefinition.getName()
-            + "-"
-            +
-            processDefinition.getVersion()
-            + "-"
-            +
-            DateUtils.getCurrentTimeStamp();
+        // todo: the name is not unique
+        this.name = String.join("-",
+                processDefinition.getName(),
+                String.valueOf(processDefinition.getVersion()),
+                DateUtils.getCurrentTimeStamp());
     }
 
     public String getVarPool() {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index f692685009..f9c19c48fd 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -180,16 +180,18 @@ public class TaskPriorityQueueConsumer extends Thread {
                     return true;
                 }
             }
-
             result = dispatcher.dispatch(executionContext);
 
             if (result) {
+                logger.info("Master success dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId());
                 addDispatchEvent(context, executionContext);
+            } else {
+                logger.info("Master failed to dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId());
             }
         } catch (RuntimeException e) {
-            logger.error("dispatch error: ", e);
+            logger.error("Master dispatch task to worker error: ", e);
         } catch (ExecuteException e) {
-            logger.error("dispatch error: {}", e.getMessage());
+            logger.error("Master dispatch task to worker error: {}", e);
         }
         return result;
     }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 865eee53a5..86cce5170e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -26,10 +26,12 @@ import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
 import java.util.Date;
 
 import io.netty.channel.Channel;
+import lombok.Data;
 
 /**
  * task event
  */
+@Data
 public class TaskEvent {
 
     /**
@@ -135,107 +137,4 @@ public class TaskEvent {
         return event;
     }
 
-    public String getVarPool() {
-        return varPool;
-    }
-
-    public void setVarPool(String varPool) {
-        this.varPool = varPool;
-    }
-
-    public int getTaskInstanceId() {
-        return taskInstanceId;
-    }
-
-    public void setTaskInstanceId(int taskInstanceId) {
-        this.taskInstanceId = taskInstanceId;
-    }
-
-    public String getWorkerAddress() {
-        return workerAddress;
-    }
-
-    public void setWorkerAddress(String workerAddress) {
-        this.workerAddress = workerAddress;
-    }
-
-    public ExecutionStatus getState() {
-        return state;
-    }
-
-    public void setState(ExecutionStatus state) {
-        this.state = state;
-    }
-
-    public Date getStartTime() {
-        return startTime;
-    }
-
-    public void setStartTime(Date startTime) {
-        this.startTime = startTime;
-    }
-
-    public Date getEndTime() {
-        return endTime;
-    }
-
-    public void setEndTime(Date endTime) {
-        this.endTime = endTime;
-    }
-
-    public String getExecutePath() {
-        return executePath;
-    }
-
-    public void setExecutePath(String executePath) {
-        this.executePath = executePath;
-    }
-
-    public String getLogPath() {
-        return logPath;
-    }
-
-    public void setLogPath(String logPath) {
-        this.logPath = logPath;
-    }
-
-    public int getProcessId() {
-        return processId;
-    }
-
-    public void setProcessId(int processId) {
-        this.processId = processId;
-    }
-
-    public String getAppIds() {
-        return appIds;
-    }
-
-    public void setAppIds(String appIds) {
-        this.appIds = appIds;
-    }
-
-    public Event getEvent() {
-        return event;
-    }
-
-    public void setEvent(Event event) {
-        this.event = event;
-    }
-
-    public Channel getChannel() {
-        return channel;
-    }
-
-    public void setChannel(Channel channel) {
-        this.channel = channel;
-    }
-
-    public int getProcessInstanceId() {
-        return processInstanceId;
-    }
-
-    public void setProcessInstanceId(int processInstanceId) {
-        this.processInstanceId = processInstanceId;
-    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
index 75c0272519..c9c2868d36 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
@@ -79,7 +79,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
 
     public void submitTaskEvent(TaskEvent taskEvent) {
         if (!processInstanceExecCacheManager.contains(taskEvent.getProcessInstanceId())) {
-            logger.warn("workflowExecuteThread is null, event: {}", taskEvent);
+            logger.warn("Cannot find workflowExecuteThread from cacheManager, event: {}", taskEvent);
             return;
         }
         if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) {
@@ -114,20 +114,24 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
         future.addCallback(new ListenableFutureCallback() {
             @Override
             public void onFailure(Throwable ex) {
-                logger.error("handle event {} failed: {}", taskExecuteThread.getProcessInstanceId(), ex);
-                if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) {
-                    taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
-                    logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId());
+                Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
+                logger.error("persist event failed processInstanceId: {}", processInstanceId, ex);
+                if (!processInstanceExecCacheManager.contains(processInstanceId)) {
+                    taskExecuteThreadMap.remove(processInstanceId);
+                    logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}",
+                            processInstanceId);
                 }
                 multiThreadFilterMap.remove(taskExecuteThread.getKey());
             }
 
             @Override
             public void onSuccess(Object result) {
-                logger.info("persist events {} succeeded.", taskExecuteThread.getProcessInstanceId());
-                if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) {
-                    taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
-                    logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId());
+                Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
+                logger.info("persist events succeeded, processInstanceId: {}", processInstanceId);
+                if (!processInstanceExecCacheManager.contains(processInstanceId)) {
+                    taskExecuteThreadMap.remove(processInstanceId);
+                    logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}",
+                            processInstanceId);
                 }
                 multiThreadFilterMap.remove(taskExecuteThread.getKey());
             }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 993e1df14b..fede5abad2 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -75,8 +75,8 @@ import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
 import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.math.NumberUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -87,6 +87,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -238,12 +239,12 @@ public class WorkflowExecuteThread {
      * @param masterConfig            masterConfig
      * @param stateWheelExecuteThread stateWheelExecuteThread
      */
-    public WorkflowExecuteThread(ProcessInstance processInstance
-        , ProcessService processService
-        , NettyExecutorManager nettyExecutorManager
-        , ProcessAlertManager processAlertManager
-        , MasterConfig masterConfig
-        , StateWheelExecuteThread stateWheelExecuteThread) {
+    public WorkflowExecuteThread(ProcessInstance processInstance,
+                                 ProcessService processService,
+                                 NettyExecutorManager nettyExecutorManager,
+                                 ProcessAlertManager processAlertManager,
+                                 MasterConfig masterConfig,
+                                 StateWheelExecuteThread stateWheelExecuteThread) {
         this.processService = processService;
         this.processInstance = processInstance;
         this.masterConfig = masterConfig;
@@ -279,15 +280,14 @@ public class WorkflowExecuteThread {
     }
 
     public String getKey() {
-        if (StringUtils.isNotEmpty(key)
-            || this.processDefinition == null) {
+        if (StringUtils.isNotEmpty(key) || this.processDefinition == null) {
             return key;
         }
 
         key = String.format("%d_%d_%d",
-            this.processDefinition.getCode(),
-            this.processDefinition.getVersion(),
-            this.processInstance.getId());
+                            this.processDefinition.getCode(),
+                            this.processDefinition.getVersion(),
+                            this.processInstance.getId());
         return key;
     }
 
@@ -436,10 +436,10 @@ public class WorkflowExecuteThread {
 
     private void taskFinished(TaskInstance taskInstance) {
         logger.info("work flow {} task id:{} code:{} state:{} ",
-            processInstance.getId(),
-            taskInstance.getId(),
-            taskInstance.getTaskCode(),
-            taskInstance.getState());
+                    processInstance.getId(),
+                    taskInstance.getId(),
+                    taskInstance.getTaskCode(),
+                    taskInstance.getState());
 
         activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
         stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
@@ -460,7 +460,7 @@ public class WorkflowExecuteThread {
             completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
             // There are child nodes and the failure policy is: CONTINUE
             if (DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)
-                    && processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
+                && processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
                 submitPostNode(Long.toString(taskInstance.getTaskCode()));
             } else {
                 errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
@@ -492,8 +492,9 @@ public class WorkflowExecuteThread {
                     this.stateEvents.add(nextEvent);
                 } else {
                     ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
-                    this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(),
-                        org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
+                    this.processService.sendStartTask2Master(processInstance,
+                                                             nextTaskInstance.getId(),
+                                                             org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
                 }
             }
         }
@@ -515,7 +516,8 @@ public class WorkflowExecuteThread {
         }
         waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
         if (!taskInstance.retryTaskIntervalOverTime()) {
-            logger.info("failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}",
+            logger.info(
+                "failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}",
                 processInstance.getId(),
                 newTaskInstance.getTaskCode(),
                 newTaskInstance.getState(),
@@ -552,7 +554,7 @@ public class WorkflowExecuteThread {
         logger.info("process instance update: {}", processInstanceId);
         processInstance = processService.findProcessInstanceById(processInstanceId);
         processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
-            processInstance.getProcessDefinitionVersion());
+                                                                 processInstance.getProcessDefinitionVersion());
         processInstance.setProcessDefinition(processDefinition);
     }
 
@@ -580,9 +582,7 @@ public class WorkflowExecuteThread {
      */
     public boolean checkProcessInstance(StateEvent stateEvent) {
         if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) {
-            logger.error("mismatch process instance id: {}, state event:{}",
-                this.processInstance.getId(),
-                stateEvent);
+            logger.error("mismatch process instance id: {}, state event:{}", this.processInstance.getId(), stateEvent);
             return false;
         }
         return true;
@@ -742,9 +742,9 @@ public class WorkflowExecuteThread {
                 return true;
             }
             logger.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}",
-                processInstance.getId(),
-                processInstance.getScheduleTime(),
-                complementListDate.toString());
+                        processInstance.getId(),
+                        processInstance.getScheduleTime(),
+                        complementListDate.toString());
             scheduleDate = complementListDate.get(index + 1);
         }
         //the next process complement
@@ -783,8 +783,7 @@ public class WorkflowExecuteThread {
     }
 
     private boolean needComplementProcess() {
-        if (processInstance.isComplementData()
-            && Flag.NO == processInstance.getIsSubProcess()) {
+        if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()) {
             return true;
         }
         return false;
@@ -863,7 +862,7 @@ public class WorkflowExecuteThread {
             return;
         }
         processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
-            processInstance.getProcessDefinitionVersion());
+                                                                 processInstance.getProcessDefinitionVersion());
         processInstance.setProcessDefinition(processDefinition);
 
         List<TaskInstance> recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam());
@@ -883,7 +882,9 @@ public class WorkflowExecuteThread {
         List<String> recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList);
         List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
         ProcessDag processDag = generateFlowDag(taskNodeList,
-            startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType());
+                                                startNodeNameList,
+                                                recoveryNodeCodeList,
+                                                processInstance.getTaskDependType());
         if (processDag == null) {
             logger.error("processDag is null");
             return;
@@ -955,14 +956,16 @@ public class WorkflowExecuteThread {
                 if (complementListDate.size() == 0 && needComplementProcess()) {
                     complementListDate = CronUtils.getSelfFireDateList(start, end, schedules);
                     logger.info(" process definition code:{} complement data: {}",
-                        processInstance.getProcessDefinitionCode(), complementListDate.toString());
+                                processInstance.getProcessDefinitionCode(),
+                                complementListDate.toString());
 
                     if (complementListDate.size() > 0 && Flag.NO == processInstance.getIsSubProcess()) {
                         processInstance.setScheduleTime(complementListDate.get(0));
-                        processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
-                            processDefinition.getGlobalParamMap(),
-                            processDefinition.getGlobalParamList(),
-                            CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE)));
+                        processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(processDefinition.getGlobalParamMap(),
+                                                                                          processDefinition.getGlobalParamList(),
+                                                                                          CommandType.COMPLEMENT_DATA,
+                                                                                          processInstance.getScheduleTime(),
+                                                                                          cmdParam.get(Constants.SCHEDULE_TIMEZONE)));
                         processService.updateProcessInstance(processInstance);
                     }
                 }
@@ -976,7 +979,7 @@ public class WorkflowExecuteThread {
      * @param taskInstance task instance
      * @return TaskInstance
      */
-    private TaskInstance submitTaskExec(TaskInstance taskInstance) {
+    private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
         try {
             // package task instance before submit
             processService.packageTaskInstance(taskInstance, processInstance);
@@ -984,17 +987,17 @@ public class WorkflowExecuteThread {
             ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
             taskProcessor.init(taskInstance, processInstance);
 
-            if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
-                && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
+            if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION && taskProcessor.getType()
+                .equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
                 notifyProcessHostUpdate(taskInstance);
             }
 
             boolean submit = taskProcessor.action(TaskAction.SUBMIT);
             if (!submit) {
                 logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!",
-                    processInstance.getId(), processInstance.getName(),
-                    taskInstance.getId(), taskInstance.getName());
-                return null;
+                        processInstance.getId(), processInstance.getName(),
+                        taskInstance.getId(), taskInstance.getName());
+                return Optional.empty();
             }
 
             // in a dag, only one taskInstance is valid per taskCode, so need to set the old taskInstance invalid
@@ -1033,10 +1036,10 @@ public class WorkflowExecuteThread {
                 taskStateChangeEvent.setType(StateEventType.TASK_STATE_CHANGE);
                 this.stateEvents.add(taskStateChangeEvent);
             }
-            return taskInstance;
+            return Optional.of(taskInstance);
         } catch (Exception e) {
             logger.error("submit standby task error", e);
-            return null;
+            return Optional.empty();
         }
     }
 
@@ -1333,6 +1336,7 @@ public class WorkflowExecuteThread {
         for (TaskInstance task : taskInstances) {
 
             if (readyToSubmitTaskQueue.contains(task)) {
+                logger.warn("Task is already at submit queue, taskInstanceId: {}", task.getId());
                 continue;
             }
 
@@ -1659,11 +1663,12 @@ public class WorkflowExecuteThread {
     private void updateProcessInstanceState() {
         ExecutionStatus state = getProcessInstanceState(processInstance);
         if (processInstance.getState() != state) {
-            logger.info(
-                "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
-                processInstance.getId(), processInstance.getName(),
-                processInstance.getState(), state,
-                processInstance.getCommandType());
+            logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
+                        processInstance.getId(),
+                        processInstance.getName(),
+                        processInstance.getState(),
+                        state,
+                        processInstance.getCommandType());
 
             processInstance.setState(state);
             if (state.typeIsFinished()) {
@@ -1687,11 +1692,12 @@ public class WorkflowExecuteThread {
     private void updateProcessInstanceState(StateEvent stateEvent) {
         ExecutionStatus state = stateEvent.getExecutionStatus();
         if (processInstance.getState() != state) {
-            logger.info(
-                "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
-                processInstance.getId(), processInstance.getName(),
-                processInstance.getState(), state,
-                processInstance.getCommandType());
+            logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
+                        processInstance.getId(),
+                        processInstance.getName(),
+                        processInstance.getState(),
+                        state,
+                        processInstance.getCommandType());
 
             processInstance.setState(state);
             if (state.typeIsFinished()) {
@@ -1723,7 +1729,9 @@ public class WorkflowExecuteThread {
                 return;
             }
             logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}",
-                taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode());
+                        taskInstance.getName(),
+                        taskInstance.getId(),
+                        taskInstance.getTaskCode());
             readyToSubmitTaskQueue.put(taskInstance);
         } catch (Exception e) {
             logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e);
@@ -1736,15 +1744,14 @@ public class WorkflowExecuteThread {
      * @param taskInstance task instance
      */
     private void removeTaskFromStandbyList(TaskInstance taskInstance) {
-        logger.info("remove task from stand by list, id: {} name:{}",
-            taskInstance.getId(),
-            taskInstance.getName());
+        logger.info("remove task from stand by list, id: {} name:{}", taskInstance.getId(), taskInstance.getName());
         try {
             readyToSubmitTaskQueue.remove(taskInstance);
         } catch (Exception e) {
             logger.error("remove task instance from readyToSubmitTaskQueue error, task id:{}, Name: {}",
-                taskInstance.getId(),
-                taskInstance.getName(), e);
+                         taskInstance.getId(),
+                         taskInstance.getName(),
+                         e);
         }
     }
 
@@ -1766,8 +1773,9 @@ public class WorkflowExecuteThread {
      * close the on going tasks
      */
     private void killAllTasks() {
-        logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(),
-            activeTaskProcessorMaps.size());
+        logger.info("kill called on process instance id: {}, num: {}",
+                    processInstance.getId(),
+                    activeTaskProcessorMaps.size());
 
         if (readyToSubmitTaskQueue.size() > 0) {
             readyToSubmitTaskQueue.clear();
@@ -1831,14 +1839,16 @@ public class WorkflowExecuteThread {
                 }
                 DependResult dependResult = getDependResultForTask(task);
                 if (DependResult.SUCCESS == dependResult) {
-                    TaskInstance taskInstance = submitTaskExec(task);
-                    if (taskInstance == null) {
+                    Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task);
+                    if (!taskInstanceOptional.isPresent()) {
                         this.taskFailedSubmit = true;
                         // Remove and add to complete map and error map
                         removeTaskFromStandbyList(task);
                         completeTaskMap.put(task.getTaskCode(), task.getId());
                         errorTaskMap.put(task.getTaskCode(), task.getId());
-                        logger.error("process {}, task {}, code:{} submit task failed.", task.getProcessInstanceId(), task.getName(), task.getTaskCode());
+                        logger.error("Task submitted failed, processInstanceId: {}, taskInstanceId: {}",
+                                     task.getProcessInstanceId(),
+                                     task.getId());
                     } else {
                         removeTaskFromStandbyList(task);
                     }
@@ -1846,11 +1856,15 @@ public class WorkflowExecuteThread {
                     // if the dependency fails, the current node is not submitted and the state changes to failure.
                     dependFailedTaskMap.put(task.getTaskCode(), task.getId());
                     removeTaskFromStandbyList(task);
-                    logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult);
+                    logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}",
+                                task.getId(),
+                                dependResult);
                 } else if (DependResult.NON_EXEC == dependResult) {
                     // for some reasons(depend task pause/stop) this task would not be submit
                     removeTaskFromStandbyList(task);
-                    logger.info("remove task {},id:{} , because depend result : {}", task.getName(), task.getId(), dependResult);
+                    logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}",
+                                task.getId(),
+                                dependResult);
                 }
             }
         } catch (Exception e) {
@@ -2009,4 +2023,30 @@ public class WorkflowExecuteThread {
             }
         }
     }
+
 }
+
+    private void measureTaskState(StateEvent taskStateEvent) {
+        if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) {
+            // the event is broken
+            logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent);
+            return;
+        }
+        if (taskStateEvent.getExecutionStatus().typeIsFinished()) {
+            TaskMetrics.incTaskFinish();
+        }
+        switch (taskStateEvent.getExecutionStatus()) {
+            case STOP:
+                TaskMetrics.incTaskStop();
+                break;
+            case SUCCESS:
+                TaskMetrics.incTaskSuccess();
+                break;
+            case FAILURE:
+                TaskMetrics.incTaskFailure();
+                break;
+            default:
+                break;
+        }
+    }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index cd337b4c95..556f36a1b7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -100,7 +100,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
     /**
      * execute workflow
      */
-    public void executeEvent(WorkflowExecuteThread workflowExecuteThread) {
+    public void executeEvent(final WorkflowExecuteRunnable workflowExecuteThread) {
         if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
             return;
         }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index ffeb89a0d2..81e3070089 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -110,7 +110,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
                 logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName());
                 return true;
             }
-            logger.info("task ready to submit: {}", taskInstance);
+            logger.info("task ready to submit: taskInstanceId: {}", taskInstance.getId());
 
             TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(),
                     processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(),
@@ -125,7 +125,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
             taskPriority.setTaskExecutionContext(taskExecutionContext);
 
             taskUpdateQueue.put(taskPriority);
-            logger.info(String.format("master submit success, task : %s", taskInstance.getName()));
+            logger.info("Master submit task to priority queue success, taskInstanceId : {}", taskInstance.getId());
             return true;
         } catch (Exception e) {
             logger.error("submit task error", e);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
index 0129338649..d5bd131a10 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
@@ -21,36 +21,47 @@ import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.Map;
-import java.util.Objects;
 import java.util.ServiceLoader;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * the factory to create task processor
  */
 public class TaskProcessorFactory {
 
-    public static final Map<String, ITaskProcessor> PROCESS_MAP = new ConcurrentHashMap<>();
+    private static final Logger logger = LoggerFactory.getLogger(TaskProcessorFactory.class);
+
+    public static final Map<String, Constructor<ITaskProcessor>> PROCESS_MAP = new ConcurrentHashMap<>();
 
     private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
 
     static {
         for (ITaskProcessor iTaskProcessor : ServiceLoader.load(ITaskProcessor.class)) {
-            PROCESS_MAP.put(iTaskProcessor.getType(), iTaskProcessor);
+            try {
+                PROCESS_MAP.put(iTaskProcessor.getType(), (Constructor<ITaskProcessor>) iTaskProcessor.getClass().getConstructor());
+            } catch (NoSuchMethodException e) {
+                throw new IllegalArgumentException("The task processor should has a no args constructor");
+            }
         }
     }
 
-    public static ITaskProcessor getTaskProcessor(String type) throws InstantiationException, IllegalAccessException {
+    public static ITaskProcessor getTaskProcessor(String type) throws InvocationTargetException, InstantiationException, IllegalAccessException {
         if (StringUtils.isEmpty(type)) {
             type = DEFAULT_PROCESSOR;
         }
-        ITaskProcessor iTaskProcessor = PROCESS_MAP.get(type);
-        if (Objects.isNull(iTaskProcessor)) {
-            iTaskProcessor = PROCESS_MAP.get(DEFAULT_PROCESSOR);
+        Constructor<ITaskProcessor> iTaskProcessorConstructor = PROCESS_MAP.get(type);
+        if (iTaskProcessorConstructor == null) {
+            logger.warn("ITaskProcessor could not found for taskType: {}", type);
+            iTaskProcessorConstructor = PROCESS_MAP.get(DEFAULT_PROCESSOR);
         }
 
-        return iTaskProcessor.getClass().newInstance();
+        return iTaskProcessorConstructor.newInstance();
     }
 
     /**
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
index d0371809cc..b974a40b31 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
@@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.master.runner.task;
 
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 
+import java.lang.reflect.InvocationTargetException;
+
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -27,7 +29,7 @@ import org.junit.Test;
 public class TaskProcessorFactoryTest {
 
     @Test
-    public void testFactory() throws InstantiationException, IllegalAccessException {
+    public void testFactory() throws InvocationTargetException, InstantiationException, IllegalAccessException {
 
         TaskInstance taskInstance = new TaskInstance();
         taskInstance.setTaskType("shell");
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
index 239a3993c0..b4177ec25d 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
@@ -21,6 +21,9 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
 
 import java.net.InetSocketAddress;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import io.netty.channel.Channel;
 
 /**
@@ -28,6 +31,8 @@ import io.netty.channel.Channel;
  */
 public class ChannelUtils {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(ChannelUtils.class);
+
     private ChannelUtils() {
         throw new IllegalStateException(ChannelUtils.class.getName());
     }
@@ -49,7 +54,7 @@ public class ChannelUtils {
      * @return remote address
      */
     public static String getRemoteAddress(Channel channel) {
-        return NetUtils.getHost(((InetSocketAddress) channel.remoteAddress()).getAddress());
+        return toAddress(channel).getAddress();
     }
 
     /**
@@ -60,6 +65,11 @@ public class ChannelUtils {
      */
     public static Host toAddress(Channel channel) {
         InetSocketAddress socketAddress = ((InetSocketAddress) channel.remoteAddress());
+        if (socketAddress == null) {
+            // the remote channel already closed
+            LOGGER.warn("The channel is already closed");
+            return Host.EMPTY;
+        }
         return new Host(NetUtils.getHost(socketAddress.getAddress()), socketAddress.getPort());
     }
 
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
index 2163e9c7d8..dc8e1f0d36 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
@@ -27,6 +27,8 @@ import java.util.Objects;
  */
 public class Host implements Serializable {
 
+    public static final Host EMPTY = new Host();
+
     /**
      * address
      */
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 48bb9c624d..eba7ced00a 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -748,7 +748,8 @@ public class ProcessServiceImpl implements ProcessService {
         processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
         processInstance.setRecovery(Flag.NO);
         processInstance.setStartTime(new Date());
-        processInstance.setRestartTime(processInstance.getStartTime());
+        // the new process instance restart time is null.
+        processInstance.setRestartTime(null);
         processInstance.setRunTimes(1);
         processInstance.setMaxTryTimes(0);
         processInstance.setCommandParam(command.getCommandParam());
@@ -1266,7 +1267,7 @@ public class ProcessServiceImpl implements ProcessService {
     @Override
     @Transactional(rollbackFor = Exception.class)
     public TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance) {
-        logger.info("start submit task : {}, instance id:{}, state: {}",
+        logger.info("start submit task : {}, processInstance id:{}, state: {}",
             taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
         //submit to db
         TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
index 231fd2a20f..2e939ee332 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
@@ -21,11 +21,13 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
 
+import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.PriorityQueue;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Task instances priority queue implementation
@@ -40,12 +42,8 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
     /**
      * queue
      */
-    private PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
-
-    /**
-     * Lock used for all public operations
-     */
-    private final ReentrantLock lock = new ReentrantLock(true);
+    private final PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
+    private final Set<Integer> taskInstanceIdSet = Collections.synchronizedSet(new HashSet<>());
 
     /**
      * put task instance to priority queue
@@ -56,6 +54,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
     @Override
     public void put(TaskInstance taskInstance) throws TaskPriorityQueueException {
         queue.add(taskInstance);
+        taskInstanceIdSet.add(taskInstance.getId());
     }
 
     /**
@@ -66,7 +65,11 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
      */
     @Override
     public TaskInstance take() throws TaskPriorityQueueException {
-        return queue.poll();
+        TaskInstance taskInstance = queue.poll();
+        if (taskInstance != null) {
+            taskInstanceIdSet.remove(taskInstance.getId());
+        }
+        return taskInstance;
     }
 
     /**
@@ -111,6 +114,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
      */
     public void clear() {
         queue.clear();
+        taskInstanceIdSet.clear();
     }
 
     /**
@@ -120,20 +124,11 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
      * @return true is contains
      */
     public boolean contains(TaskInstance taskInstance) {
-        return this.contains(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
+        return this.contains(taskInstance.getId());
     }
 
-    public boolean contains(long taskCode, int taskVersion) {
-        Iterator<TaskInstance> iterator = this.queue.iterator();
-        while (iterator.hasNext()) {
-            TaskInstance taskInstance = iterator.next();
-            if (taskCode == taskInstance.getTaskCode()
-                    && taskVersion == taskInstance.getTaskDefinitionVersion()) {
-                return true;
-            }
-        }
-        return false;
-
+    public boolean contains(int taskInstanceId) {
+        return taskInstanceIdSet.contains(taskInstanceId);
     }
 
     /**
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
index 8da3a6c194..67e40d1189 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
@@ -46,14 +46,11 @@ public class PeerTaskInstancePriorityQueueTest {
         Assert.assertTrue(queue.size() < peekBeforeLength);
     }
 
-    @Test
+
+    @Test(expected = TaskPriorityQueueException.class)
     public void poll() throws Exception {
         PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
-        try {
-            queue.poll(1000, TimeUnit.MILLISECONDS);
-        } catch (TaskPriorityQueueException e) {
-            e.printStackTrace();
-        }
+        queue.poll(1000, TimeUnit.MILLISECONDS);
     }
 
     @Test
diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index c50883da62..7be661de4e 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -109,7 +109,7 @@ master:
   # master prepare execute thread number to limit handle commands in parallel
   pre-exec-threads: 10
   # master execute thread number to limit process instances in parallel
-  exec-threads: 100
+  exec-threads: 10
   # master dispatch task number per batch
   dispatch-task-number: 3
   # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
@@ -134,7 +134,7 @@ worker:
   # worker listener port
   listen-port: 1234
   # worker execute thread number to limit task instances in parallel
-  exec-threads: 100
+  exec-threads: 10
   # worker heartbeat interval, the unit is second
   heartbeat-interval: 10
   # worker host weight to dispatch tasks, default value 100
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
index 737b876b74..9d084d0ec8 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
@@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.commons.collections4.MapUtils;
 
 import java.io.File;
+import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
@@ -147,7 +148,11 @@ public class ShellTask extends AbstractTaskExecutor {
             if (!file.getParentFile().exists()) {
                 file.getParentFile().mkdirs();
             }
-            Files.createFile(path, attr);
+            try {
+                Files.createFile(path, attr);
+            } catch (FileAlreadyExistsException ex) {
+                // this is expected
+            }
         }
 
         Files.write(path, shellParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 5192e27d61..6edff09f97 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -21,9 +21,9 @@ import java.util.Set;
 
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.stereotype.Component;
+import org.springframework.context.annotation.Configuration;
 
-@Component
+@Configuration
 @EnableConfigurationProperties
 @ConfigurationProperties("worker")
 public class WorkerConfig {
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 3641de8453..2a92222b0a 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -162,6 +162,8 @@ public class TaskCallbackService {
                     }
                 }
             });
+        } else {
+            logger.warn("Remote channel of taskInstanceId is null: {}, cannot send command: {}", taskInstanceId, command);
         }
     }
 
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index cc15eb62f4..e6ff2b5653 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -19,12 +19,15 @@ package org.apache.dolphinscheduler.server.worker.runner;
 
 import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
 import org.apache.dolphinscheduler.common.storage.StorageOperate;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.CommonUtils;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -40,10 +43,17 @@ import org.apache.dolphinscheduler.service.task.TaskPluginManager;
 
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -233,7 +243,11 @@ public class TaskExecuteThread implements Runnable, Delayed {
                 org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath));
                 logger.info("exec local path: {} cleared.", execLocalPath);
             } catch (IOException e) {
-                logger.error("delete exec dir failed : {}", e.getMessage(), e);
+                if (e instanceof NoSuchFileException) {
+                    // this is expected
+                } else {
+                    logger.error("Delete exec dir failed.", e);
+                }
             }
         }
     }
@@ -264,7 +278,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
                 task.cancelApplication(true);
                 ProcessUtils.killYarnJob(taskExecutionContext);
             } catch (Exception e) {
-                logger.error(e.getMessage(), e);
+                logger.error("Kill task failed", e);
             }
         }
     }