You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:37 UTC

[dolphinscheduler] 10/29: Optimize master log, use MDC to inject workflow instance id and task instance id in log (#10516)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 3ab9ee13fcffcfe69de90353ad43f8eb86c015a6
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu Jun 23 11:45:06 2022 +0800

    Optimize master log, use MDC to inject workflow instance id and task instance id in log (#10516)
    
    * Optimize master log, add workflow instance id and task instance id in log
    
    * Use MDC to set the workflow info in log4j
    
    * Add workflowInstanceId and taskInstanceId in MDC
    
    (cherry picked from commit db595b3eff8f5e4728665b4b7b7082af710e8f9d)
---
 .../apache/dolphinscheduler/alert/AlertServer.java |  23 ++-
 .../apache/dolphinscheduler/common/Constants.java  |  36 ++--
 .../dolphinscheduler/common/thread/Stopper.java    |  34 +++-
 .../common/thread/ThreadUtils.java                 |  18 +-
 .../dolphinscheduler/common/utils/LoggerUtils.java |  34 +++-
 .../server/master/MasterServer.java                |  25 +--
 .../master/processor/StateEventProcessor.java      |  12 +-
 .../master/processor/TaskEventProcessor.java       |  14 +-
 .../processor/TaskExecuteResponseProcessor.java    |  12 +-
 .../processor/TaskKillResponseProcessor.java       |   3 +-
 .../processor/queue/StateEventResponseService.java |  23 ++-
 .../processor/queue/TaskExecuteRunnable.java       |   3 +
 .../processor/queue/TaskExecuteThreadPool.java     |  12 +-
 .../master/registry/MasterRegistryClient.java      |  15 +-
 .../server/master/runner/EventExecuteService.java  |  18 +-
 .../master/runner/FailoverExecuteThread.java       |   5 +-
 .../master/runner/MasterSchedulerService.java      |  60 ++++--
 .../master/runner/StateWheelExecuteThread.java     | 222 +++++++++++----------
 .../master/runner/WorkflowExecuteRunnable.java     |  24 ++-
 .../master/runner/WorkflowExecuteThreadPool.java   |  29 ++-
 .../master/runner/task/TaskProcessorFactory.java   |   9 +-
 .../server/master/service/FailoverService.java     |  24 ++-
 .../src/main/resources/logback-spring.xml          |   4 +-
 .../service/alert/AlertClientService.java          |  21 +-
 .../service/task/TaskPluginManager.java            |   5 +-
 .../src/main/resources/logback-spring.xml          |   4 +-
 .../plugin/task/api/ProcessUtils.java              |  15 +-
 .../plugin/task/api/TaskPluginException.java       |  23 +--
 .../server/worker/WorkerServer.java                | 117 +++--------
 .../server/worker/prc/WorkerRpcServer.java         |  97 +++++++++
 .../worker/processor/TaskExecuteProcessor.java     | 169 +++++++++-------
 .../processor/TaskExecuteRunningAckProcessor.java  |  17 +-
 .../worker/runner/RetryReportTaskStatusThread.java |   4 +-
 .../server/worker/runner/TaskExecuteThread.java    |  31 ++-
 .../server/worker/runner/WorkerManagerThread.java  |   5 +-
 .../src/main/resources/logback-spring.xml          |   4 +-
 .../worker/processor/TaskExecuteProcessorTest.java |  42 ++--
 .../worker/runner/TaskExecuteThreadTest.java       |  27 ++-
 38 files changed, 747 insertions(+), 493 deletions(-)

diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
index ee9d5b3f62..5f449fcab4 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
@@ -49,7 +49,10 @@ public class AlertServer implements Closeable {
     private final AlertConfig alertConfig;
     private NettyRemotingServer nettyRemotingServer;
 
-    public AlertServer(PluginDao pluginDao, AlertSenderService alertSenderService, AlertRequestProcessor alertRequestProcessor, AlertConfig alertConfig) {
+    public AlertServer(PluginDao pluginDao,
+                       AlertSenderService alertSenderService,
+                       AlertRequestProcessor alertRequestProcessor,
+                       AlertConfig alertConfig) {
         this.pluginDao = pluginDao;
         this.alertSenderService = alertSenderService;
         this.alertRequestProcessor = alertRequestProcessor;
@@ -68,11 +71,12 @@ public class AlertServer implements Closeable {
 
     @EventListener
     public void run(ApplicationReadyEvent readyEvent) {
-        logger.info("alert server starting...");
+        logger.info("Alert server is staring ...");
 
         checkTable();
         startServer();
         alertSenderService.start();
+        logger.info("Alert server is started ...");
     }
 
     @Override
@@ -89,24 +93,23 @@ public class AlertServer implements Closeable {
     public void destroy(String cause) {
 
         try {
+            // set stop signal is true
             // execute only once
-            if (Stopper.isStopped()) {
+            if (!Stopper.stop()) {
+                logger.warn("AlterServer is already stopped");
                 return;
             }
 
-            logger.info("alert server is stopping ..., cause : {}", cause);
-
-            // set stop signal is true
-            Stopper.stop();
+            logger.info("Alert server is stopping, cause: {}", cause);
 
             // thread sleep 3 seconds for thread quietly stop
-            ThreadUtils.sleep(3000L);
+            ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
 
             // close
             this.nettyRemotingServer.close();
-
+            logger.info("Alter server stopped, cause: {}", cause);
         } catch (Exception e) {
-            logger.error("alert server stop exception ", e);
+            logger.error("Alert server stop failed, cause: {}", cause, e);
         }
     }
 
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index b7714c7817..f39a0c94aa 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.SystemUtils;
 
+import java.time.Duration;
 import java.util.regex.Pattern;
 
 /**
@@ -376,6 +377,8 @@ public final class Constants {
      */
     public static final long SLEEP_TIME_MILLIS_SHORT = 100L;
 
+    public static final Duration SERVER_CLOSE_WAIT_TIME = Duration.ofSeconds(3);
+
     /**
      * one second mils
      */
@@ -636,28 +639,31 @@ public final class Constants {
      */
     public static final String LOGIN_USER_KEY_TAB_PATH = "login.user.keytab.path";
 
+    public static final String WORKFLOW_INSTANCE_ID_MDC_KEY = "workflowInstanceId";
+    public static final String TASK_INSTANCE_ID_MDC_KEY = "taskInstanceId";
+
     /**
      * task log info format
      */
     public static final String TASK_LOG_INFO_FORMAT = "TaskLogInfo-%s";
 
-    public static final int[] NOT_TERMINATED_STATES = new int[]{
-            ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
-            ExecutionStatus.DISPATCH.ordinal(),
-            ExecutionStatus.RUNNING_EXECUTION.ordinal(),
-            ExecutionStatus.DELAY_EXECUTION.ordinal(),
-            ExecutionStatus.READY_PAUSE.ordinal(),
-            ExecutionStatus.READY_STOP.ordinal(),
-            ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
-            ExecutionStatus.WAITING_THREAD.ordinal(),
-            ExecutionStatus.WAITING_DEPEND.ordinal()
+    public static final int[] NOT_TERMINATED_STATES = new int[] {
+        ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+        ExecutionStatus.DISPATCH.ordinal(),
+        ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+        ExecutionStatus.DELAY_EXECUTION.ordinal(),
+        ExecutionStatus.READY_PAUSE.ordinal(),
+        ExecutionStatus.READY_STOP.ordinal(),
+        ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
+        ExecutionStatus.WAITING_THREAD.ordinal(),
+        ExecutionStatus.WAITING_DEPEND.ordinal()
     };
 
-    public static final int[] RUNNING_PROCESS_STATE = new int[]{
-            ExecutionStatus.RUNNING_EXECUTION.ordinal(),
-            ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
-            ExecutionStatus.DISPATCH.ordinal(),
-            ExecutionStatus.SERIAL_WAIT.ordinal()
+    public static final int[] RUNNING_PROCESS_STATE = new int[] {
+        ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+        ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+        ExecutionStatus.DISPATCH.ordinal(),
+        ExecutionStatus.SERIAL_WAIT.ordinal()
     };
 
     /**
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
index 67abde7e7a..777203c0f3 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
@@ -19,22 +19,40 @@ package org.apache.dolphinscheduler.common.thread;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import lombok.experimental.UtilityClass;
+
 /**
- *  if the process closes, a signal is placed as true, and all threads get this flag to stop working
+ * If the process closes, a signal is placed as true, and all threads get this flag to stop working.
  */
+@UtilityClass
 public class Stopper {
 
-    private static AtomicBoolean signal = new AtomicBoolean(false);
+    private static final AtomicBoolean stoppedSignal = new AtomicBoolean(false);
 
-    public static final boolean isStopped() {
-        return signal.get();
+    /**
+     * Return the flag if the Server is stopped.
+     *
+     * @return True, if the server is stopped; False, the server is still running.
+     */
+    public static boolean isStopped() {
+        return stoppedSignal.get();
     }
 
-    public static final boolean isRunning() {
-        return !signal.get();
+    /**
+     * Return the flag if the Server is stopped.
+     *
+     * @return True, if the server is running, False, the server is stopped.
+     */
+    public static boolean isRunning() {
+        return !stoppedSignal.get();
     }
 
-    public static final void stop() {
-        signal.set(true);
+    /**
+     * Stop the server
+     *
+     * @return True, if the server stopped success. False, if the server is already stopped.
+     */
+    public static boolean stop() {
+        return stoppedSignal.compareAndSet(false, true);
     }
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
index 75f624d15d..5c8020b7cd 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
@@ -23,24 +23,28 @@ import java.util.concurrent.ThreadFactory;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-/**
- * thread utils
- */
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
 public class ThreadUtils {
     /**
      * Wrapper over newDaemonFixedThreadExecutor.
+     *
      * @param threadName threadName
      * @param threadsNum threadsNum
      * @return ExecutorService
      */
-    public static ExecutorService newDaemonFixedThreadExecutor(String threadName,int threadsNum) {
+    public static ExecutorService newDaemonFixedThreadExecutor(String threadName, int threadsNum) {
         ThreadFactory threadFactory = new ThreadFactoryBuilder()
-                .setDaemon(true)
-                .setNameFormat(threadName)
-                .build();
+            .setDaemon(true)
+            .setNameFormat(threadName)
+            .build();
         return Executors.newFixedThreadPool(threadsNum, threadFactory);
     }
 
+    /**
+     * Sleep in given mills, this is not accuracy.
+     */
     public static void sleep(final long millis) {
         try {
             Thread.sleep(millis);
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
index 8dd4969494..8c749c53fc 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
@@ -32,16 +32,16 @@ import java.util.regex.Pattern;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import lombok.experimental.UtilityClass;
 
 /**
  * logger utils
  */
+@UtilityClass
 public class LoggerUtils {
 
-    private LoggerUtils() {
-        throw new UnsupportedOperationException("Construct LoggerUtils");
-    }
-
     private static final Logger logger = LoggerFactory.getLogger(LoggerUtils.class);
 
     /**
@@ -109,4 +109,30 @@ public class LoggerUtils {
         }
         return "";
     }
+
+    public static void setWorkflowAndTaskInstanceIDMDC(int workflowInstanceId, int taskInstanceId) {
+        setWorkflowInstanceIdMDC(workflowInstanceId);
+        setTaskInstanceIdMDC(taskInstanceId);
+    }
+
+    public static void setWorkflowInstanceIdMDC(int workflowInstanceId) {
+        MDC.put(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY, String.valueOf(workflowInstanceId));
+    }
+
+    public static void setTaskInstanceIdMDC(int taskInstanceId) {
+        MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY, String.valueOf(taskInstanceId));
+    }
+
+    public static void removeWorkflowAndTaskInstanceIdMDC() {
+        removeWorkflowInstanceIdMDC();
+        removeTaskInstanceIdMDC();
+    }
+
+    public static void removeWorkflowInstanceIdMDC() {
+        MDC.remove(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY);
+    }
+
+    public static void removeTaskInstanceIdMDC() {
+        MDC.remove(Constants.TASK_INSTANCE_ID_MDC_KEY);
+    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index cb01e14504..95409a616b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
 import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
 import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
@@ -103,7 +104,7 @@ public class MasterServer implements IStoppable {
 
         Runtime.getRuntime().addShutdownHook(new Thread(() -> {
             if (Stopper.isRunning()) {
-                close("shutdownHook");
+                close("MasterServer shutdownHook");
             }
         }));
     }
@@ -116,23 +117,17 @@ public class MasterServer implements IStoppable {
     public void close(String cause) {
 
         try {
+            // set stop signal is true
             // execute only once
-            if (Stopper.isStopped()) {
-                logger.warn("MasterServer has been stopped ..., current cause: {}", cause);
+            if (!Stopper.stop()) {
+                logger.warn("MasterServer is already stopped, current cause: {}", cause);
                 return;
             }
 
-            logger.info("master server is stopping ..., cause : {}", cause);
-
-            // set stop signal is true
-            Stopper.stop();
+            logger.info("Master server is stopping, current cause : {}", cause);
 
-            try {
-                // thread sleep 3 seconds for thread quietly stop
-                Thread.sleep(3000L);
-            } catch (Exception e) {
-                logger.warn("thread sleep exception ", e);
-            }
+            // thread sleep 3 seconds for thread quietly stop
+            ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
             // close
             this.masterSchedulerService.close();
             this.masterRPCServer.close();
@@ -141,9 +136,9 @@ public class MasterServer implements IStoppable {
             // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
             springApplicationContext.close();
 
-            logger.info("MasterServer stopped...");
+            logger.info("MasterServer stopped, current cause: {}", cause);
         } catch (Exception e) {
-            logger.error("master server stop exception ", e);
+            logger.error("MasterServer stop failed, current cause: {}", cause, e);
         }
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
index e132f285de..b2c47e4112 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor;
 import org.apache.dolphinscheduler.common.enums.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -64,8 +65,15 @@ public class StateEventProcessor implements NettyRequestProcessor {
         StateEventType type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE;
         stateEvent.setType(type);
 
-        logger.info("received command : {}", stateEvent);
-        stateEventResponseService.addResponse(stateEvent);
+        try {
+            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
+
+            logger.info("Received state event change command, event: {}", stateEvent);
+            stateEventResponseService.addResponse(stateEvent);
+        }finally {
+            LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+        }
+
     }
 
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
index ea2e4539c3..e597f1cae5 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor;
 import org.apache.dolphinscheduler.common.enums.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
@@ -49,8 +50,8 @@ public class TaskEventProcessor implements NettyRequestProcessor {
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_FORCE_STATE_EVENT_REQUEST == command.getType()
-                        || CommandType.TASK_WAKEUP_EVENT_REQUEST == command.getType()
-                , String.format("invalid command type: %s", command.getType()));
+                || CommandType.TASK_WAKEUP_EVENT_REQUEST == command.getType()
+            , String.format("invalid command type: %s", command.getType()));
 
         TaskEventChangeCommand taskEventChangeCommand = JSONUtils.parseObject(command.getBody(), TaskEventChangeCommand.class);
         StateEvent stateEvent = new StateEvent();
@@ -58,8 +59,13 @@ public class TaskEventProcessor implements NettyRequestProcessor {
         stateEvent.setProcessInstanceId(taskEventChangeCommand.getProcessInstanceId());
         stateEvent.setTaskInstanceId(taskEventChangeCommand.getTaskInstanceId());
         stateEvent.setType(StateEventType.WAIT_TASK_GROUP);
-        logger.info("received command : {}", stateEvent);
-        stateEventResponseService.addEvent2WorkflowExecute(stateEvent);
+        try {
+            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
+            logger.info("Received task event change command, event: {}", stateEvent);
+            stateEventResponseService.addEvent2WorkflowExecute(stateEvent);
+        } finally {
+            LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+        }
     }
 
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
index 264e42e3a5..b17def873c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.server.master.processor;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@@ -57,9 +58,14 @@ public class TaskExecuteResponseProcessor implements NettyRequestProcessor {
         Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
 
         TaskExecuteResponseCommand taskExecuteResponseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class);
-        logger.info("received command : {}", taskExecuteResponseCommand);
-
         TaskEvent taskResponseEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel);
-        taskEventService.addEvent(taskResponseEvent);
+        try {
+            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResponseEvent.getProcessInstanceId(), taskResponseEvent.getTaskInstanceId());
+            logger.info("Received task execute response, event: {}", taskResponseEvent);
+
+            taskEventService.addEvent(taskResponseEvent);
+        } finally {
+            LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+        }
     }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
index 135257c9a8..6079329b90 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
@@ -51,7 +51,8 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor {
         Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
 
         TaskKillResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskKillResponseCommand.class);
-        logger.info("received task kill response command : {}", responseCommand);
+        logger.info("[TaskInstance-{}] Received task kill response command : {}",
+            responseCommand.getTaskInstanceId(), responseCommand);
     }
 
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
index e34dedca67..4ca6b9eccb 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
 import org.apache.dolphinscheduler.common.enums.StateEvent;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@@ -81,7 +82,13 @@ public class StateEventResponseService {
             List<StateEvent> remainEvents = new ArrayList<>(eventQueue.size());
             eventQueue.drainTo(remainEvents);
             for (StateEvent event : remainEvents) {
-                this.persist(event);
+                try {
+                    LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId());
+                    this.persist(event);
+
+                } finally {
+                    LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+                }
             }
         }
     }
@@ -93,7 +100,7 @@ public class StateEventResponseService {
         try {
             eventQueue.put(stateEvent);
         } catch (InterruptedException e) {
-            logger.error("put state event : {} error :{}", stateEvent, e);
+            logger.error("Put state event : {} error", stateEvent, e);
             Thread.currentThread().interrupt();
         }
     }
@@ -109,18 +116,22 @@ public class StateEventResponseService {
 
         @Override
         public void run() {
-
+            logger.info("State event loop service started");
             while (Stopper.isRunning()) {
                 try {
                     // if not task , blocking here
                     StateEvent stateEvent = eventQueue.take();
+                    LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
                     persist(stateEvent);
                 } catch (InterruptedException e) {
+                    logger.warn("State event loop service interrupted, will stop this loop", e);
                     Thread.currentThread().interrupt();
                     break;
+                } finally {
+                    LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                 }
             }
-            logger.info("StateEventResponseWorker stopped");
+            logger.info("State event loop service stopped");
         }
     }
 
@@ -135,6 +146,8 @@ public class StateEventResponseService {
     private void persist(StateEvent stateEvent) {
         try {
             if (!this.processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) {
+                logger.warn("Persist event into workflow execute thread error, "
+                    + "cannot find the workflow instance from cache manager, event: {}", stateEvent);
                 writeResponse(stateEvent, ExecutionStatus.FAILURE);
                 return;
             }
@@ -152,7 +165,7 @@ public class StateEventResponseService {
             workflowExecuteThreadPool.submitStateEvent(stateEvent);
             writeResponse(stateEvent, ExecutionStatus.SUCCESS);
         } catch (Exception e) {
-            logger.error("persist event queue error, event: {}", stateEvent, e);
+            logger.error("Persist event queue error, event: {}", stateEvent, e);
         }
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
index 593567c6eb..3bf249ed3e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
 import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.enums.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
@@ -71,11 +72,13 @@ public class TaskExecuteRunnable implements Runnable {
         while (!this.events.isEmpty()) {
             TaskEvent event = this.events.peek();
             try {
+                LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId());
                 persist(event);
             } catch (Exception e) {
                 logger.error("persist error, event:{}, error: {}", event, e);
             } finally {
                 this.events.remove(event);
+                LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
             }
         }
     }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
index 323ea86411..68e4511c42 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
@@ -115,11 +115,11 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
             @Override
             public void onFailure(Throwable ex) {
                 Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
-                logger.error("persist event failed processInstanceId: {}", processInstanceId, ex);
+                logger.error("[WorkflowInstance-{}] persist event failed", processInstanceId, ex);
                 if (!processInstanceExecCacheManager.contains(processInstanceId)) {
                     taskExecuteThreadMap.remove(processInstanceId);
-                    logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}",
-                            processInstanceId);
+                    logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap",
+                        processInstanceId);
                 }
                 multiThreadFilterMap.remove(taskExecuteThread.getKey());
             }
@@ -127,11 +127,11 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
             @Override
             public void onSuccess(Object result) {
                 Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
-                logger.info("persist events succeeded, processInstanceId: {}", processInstanceId);
+                logger.info("[WorkflowInstance-{}] persist events succeeded", processInstanceId);
                 if (!processInstanceExecCacheManager.contains(processInstanceId)) {
                     taskExecuteThreadMap.remove(processInstanceId);
-                    logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}",
-                            processInstanceId);
+                    logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap",
+                        processInstanceId);
                 }
                 multiThreadFilterMap.remove(taskExecuteThread.getKey());
             }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 5cf1c945af..771bac4a7a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -99,7 +99,6 @@ public class MasterRegistryClient {
                                                                                         registryClient));
             registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
         } catch (Exception e) {
-            logger.error("master start up exception", e);
             throw new RegistryException("Master registry client start up error", e);
         }
     }
@@ -186,7 +185,7 @@ public class MasterRegistryClient {
      * Registry the current master server itself to registry.
      */
     void registry() {
-        logger.info("master node : {} registering to registry center...", masterAddress);
+        logger.info("Master node : {} registering to registry center", masterAddress);
         String localNodePath = getCurrentNodePath();
         int masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
         HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
@@ -201,7 +200,7 @@ public class MasterRegistryClient {
         registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
 
         while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
-            logger.warn("The current master server node:{} cannot find in registry....", NetUtils.getHost());
+            logger.warn("The current master server node:{} cannot find in registry", NetUtils.getHost());
             ThreadUtils.sleep(SLEEP_TIME_MILLIS);
         }
 
@@ -212,9 +211,7 @@ public class MasterRegistryClient {
         registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP);
 
         this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval, TimeUnit.SECONDS);
-        logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s",
-                    masterAddress,
-                    masterHeartbeatInterval);
+        logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s", masterAddress, masterHeartbeatInterval);
 
     }
 
@@ -223,12 +220,12 @@ public class MasterRegistryClient {
             String address = getLocalAddress();
             String localNodePath = getCurrentNodePath();
             registryClient.remove(localNodePath);
-            logger.info("master node : {} unRegistry to register center.", address);
+            logger.info("Master node : {} unRegistry to register center.", address);
             heartBeatExecutor.shutdown();
-            logger.info("heartbeat executor shutdown");
+            logger.info("MasterServer heartbeat executor shutdown");
             registryClient.close();
         } catch (Exception e) {
-            logger.error("remove registry path exception ", e);
+            logger.error("MasterServer remove registry path exception ", e);
         }
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
index 97c67d8493..3ce7d4e240 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.runner;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
 
 import java.util.concurrent.TimeUnit;
@@ -49,25 +50,36 @@ public class EventExecuteService extends BaseDaemonThread {
 
     @Override
     public synchronized void start() {
+        logger.info("Master Event execute service starting");
         super.start();
+        logger.info("Master Event execute service started");
     }
 
     @Override
     public void run() {
-        logger.info("Event service started");
         while (Stopper.isRunning()) {
             try {
                 eventHandler();
                 TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
+            } catch (InterruptedException interruptedException) {
+                logger.warn("Master event service interrupted, will exit this loop", interruptedException);
+                Thread.currentThread().interrupt();
+                break;
             } catch (Exception e) {
-                logger.error("Event service thread error", e);
+                logger.error("Master event execute service error", e);
             }
         }
     }
 
     private void eventHandler() {
         for (WorkflowExecuteRunnable workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) {
-            workflowExecuteThreadPool.executeEvent(workflowExecuteThread);
+            try {
+                LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId());
+                workflowExecuteThreadPool.executeEvent(workflowExecuteThread);
+
+            } finally {
+                LoggerUtils.removeWorkflowInstanceIdMDC();
+            }
         }
     }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 1c6ea144e4..d6f3937f4b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -49,7 +49,9 @@ public class FailoverExecuteThread extends BaseDaemonThread {
 
     @Override
     public synchronized void start() {
+        logger.info("Master failover thread staring");
         super.start();
+        logger.info("Master failover thread stared");
     }
 
     @Override
@@ -57,14 +59,13 @@ public class FailoverExecuteThread extends BaseDaemonThread {
         // when startup, wait 10s for ready
         ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 10);
 
-        logger.info("failover execute thread started");
         while (Stopper.isRunning()) {
             try {
                 // todo: DO we need to schedule a task to do this kind of check
                 // This kind of check may only need to be executed when a master server start
                 failoverService.checkMasterFailover();
             } catch (Exception e) {
-                logger.error("failover execute error", e);
+                logger.error("Master failover thread execute error", e);
             } finally {
                 ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * masterConfig.getFailoverInterval() * 60);
             }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 94991a50c3..023d871355 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.SlotCheckState;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
@@ -111,20 +112,23 @@ public class MasterSchedulerService extends BaseDaemonThread {
      * constructor of MasterSchedulerService
      */
     public void init() {
-        this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Pre-Exec-Thread", masterConfig.getPreExecThreads());
+        this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
         NettyClientConfig clientConfig = new NettyClientConfig();
         this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
     }
 
     @Override
     public synchronized void start() {
+        logger.info("Master schedule service starting..");
         this.stateWheelExecuteThread.start();
         super.start();
+        logger.info("Master schedule service started...");
     }
 
     public void close() {
+        logger.info("Master schedule service stopping...");
         nettyRemotingClient.close();
-        logger.info("master schedule service stopped...");
+        logger.info("Master schedule service stopped...");
     }
 
     /**
@@ -132,7 +136,6 @@ public class MasterSchedulerService extends BaseDaemonThread {
      */
     @Override
     public void run() {
-        logger.info("master scheduler started");
         while (Stopper.isRunning()) {
             try {
                 boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
@@ -142,8 +145,12 @@ public class MasterSchedulerService extends BaseDaemonThread {
                     continue;
                 }
                 scheduleProcess();
+            } catch (InterruptedException interruptedException) {
+                logger.warn("Master schedule service interrupted, close the loop", interruptedException);
+                Thread.currentThread().interrupt();
+                break;
             } catch (Exception e) {
-                logger.error("master scheduler thread error", e);
+                logger.error("Master schedule service loop command error", e);
             }
         }
     }
@@ -152,7 +159,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
      * 1. get command by slot
      * 2. donot handle command if slot is empty
      */
-    private void scheduleProcess() throws Exception {
+    private void scheduleProcess() throws InterruptedException {
         List<Command> commands = findCommands();
         if (CollectionUtils.isEmpty(commands)) {
             //indicate that no command ,sleep for 1s
@@ -167,8 +174,10 @@ public class MasterSchedulerService extends BaseDaemonThread {
         MasterServerMetrics.incMasterConsumeCommand(commands.size());
 
         for (ProcessInstance processInstance : processInstances) {
-
-            WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
+            try {
+                LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
+                logger.info("Master schedule service starting workflow instance");
+                WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
                     processInstance
                     , processService
                     , nettyExecutorManager
@@ -176,15 +185,21 @@ public class MasterSchedulerService extends BaseDaemonThread {
                     , masterConfig
                     , stateWheelExecuteThread);
 
-            this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
-            if (processInstance.getTimeout() > 0) {
-                stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
+                this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
+                if (processInstance.getTimeout() > 0) {
+                    stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
+                }
+                workflowExecuteThreadPool.startWorkflow(workflowExecuteRunnable);
+                logger.info("Master schedule service started workflow instance");
+
+            } finally {
+                LoggerUtils.removeWorkflowInstanceIdMDC();
             }
-            workflowExecuteThreadPool.startWorkflow(workflowExecuteRunnable);
         }
     }
 
-    private List<ProcessInstance> command2ProcessInstance(List<Command> commands) {
+    private List<ProcessInstance> command2ProcessInstance(List<Command> commands) throws InterruptedException {
+        logger.info("Master schedule service transforming command to ProcessInstance, commandSize: {}", commands.size());
         List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));
         CountDownLatch latch = new CountDownLatch(commands.size());
         for (final Command command : commands) {
@@ -193,7 +208,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
                     // slot check again
                     SlotCheckState slotCheckState = slotCheck(command);
                     if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) {
-                        logger.info("handle command {} skip, slot check state: {}", command.getId(), slotCheckState);
+                        logger.info("Master handle command {} skip, slot check state: {}", command.getId(), slotCheckState);
                         return;
                     }
                     ProcessInstance processInstance = processService.handleCommand(logger,
@@ -201,10 +216,10 @@ public class MasterSchedulerService extends BaseDaemonThread {
                             command);
                     if (processInstance != null) {
                         processInstances.add(processInstance);
-                        logger.info("handle command {} end, create process instance {}", command.getId(), processInstance.getId());
+                        logger.info("Master handle command {} end, create process instance {}", command.getId(), processInstance.getId());
                     }
                 } catch (Exception e) {
-                    logger.error("handle command {} error ", command.getId(), e);
+                    logger.error("Master handle command {} error ", command.getId(), e);
                     processService.moveToErrorCommand(command, e.toString());
                 } finally {
                     latch.countDown();
@@ -212,13 +227,10 @@ public class MasterSchedulerService extends BaseDaemonThread {
             });
         }
 
-        try {
-            // make sure to finish handling command each time before next scan
-            latch.await();
-        } catch (InterruptedException e) {
-            logger.error("countDownLatch await error ", e);
-        }
-
+        // make sure to finish handling command each time before next scan
+        latch.await();
+        logger.info("Master schedule service transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}",
+            commands.size(), processInstances.size());
         return processInstances;
     }
 
@@ -231,6 +243,10 @@ public class MasterSchedulerService extends BaseDaemonThread {
             int masterCount = ServerNodeManager.getMasterSize();
             if (masterCount > 0) {
                 result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
+                if (CollectionUtils.isNotEmpty(result)) {
+                    logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}",
+                        result.size(), thisMasterSlot, masterCount);
+                }
             }
         }
         return result;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 12d404406c..53acb4ce83 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -43,6 +44,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import lombok.NonNull;
+
 /**
  * Check thread
  * 1. timeout task check
@@ -110,10 +113,16 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
 
     public void addProcess4TimeoutCheck(ProcessInstance processInstance) {
         processInstanceTimeoutCheckList.add(processInstance.getId());
+        logger.info("Success add workflow instance into timeout check list");
     }
 
     public void removeProcess4TimeoutCheck(ProcessInstance processInstance) {
-        processInstanceTimeoutCheckList.remove(processInstance.getId());
+        boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstance.getId());
+        if (removeFlag) {
+            logger.info("Success remove workflow instance from timeout check list");
+        } else {
+            logger.warn("Failed to remove workflow instance from timeout check list");
+        }
     }
 
     private void checkProcess4Timeout() {
@@ -121,106 +130,95 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
             return;
         }
         for (Integer processInstanceId : processInstanceTimeoutCheckList) {
-            if (processInstanceId == null) {
-                continue;
-            }
             WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
             if (workflowExecuteThread == null) {
-                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId);
+                logger.warn("Check workflow timeout failed, can not find workflowExecuteThread from cache manager, will remove this workflowInstance from check list");
                 processInstanceTimeoutCheckList.remove(processInstanceId);
                 continue;
             }
             ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
             if (processInstance == null) {
+                logger.warn("Check workflow timeout failed, the workflowInstance is null");
                 continue;
             }
             long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
             if (timeRemain < 0) {
+                logger.info("Workflow instance timeout, adding timeout event");
                 addProcessTimeoutEvent(processInstance);
                 processInstanceTimeoutCheckList.remove(processInstance.getId());
+                logger.info("Workflow instance timeout, added timeout event");
             }
         }
     }
 
-    public void addTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+    public void addTask4TimeoutCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
         TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
-        if (taskInstanceKey == null) {
-            logger.error("taskInstanceKey is null");
-            return;
-        }
+        logger.info("Adding task instance into timeout check list");
         if (taskInstanceTimeoutCheckList.contains(taskInstanceKey)) {
+            logger.warn("Task instance is already in timeout check list");
             return;
         }
         TaskDefinition taskDefinition = taskInstance.getTaskDefine();
         if (taskDefinition == null) {
-            logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
+            logger.error("Failed to add task instance into timeout check list, taskDefinition is null");
             return;
         }
         if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
             taskInstanceTimeoutCheckList.add(taskInstanceKey);
+            logger.info("Timeout flag is open, added task instance into timeout check list");
         }
         if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
             taskInstanceTimeoutCheckList.add(taskInstanceKey);
+            logger.info("task instance is dependTask orSubProcess, added task instance into timeout check list");
         }
     }
 
-    public void removeTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+    public void removeTask4TimeoutCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
         TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
-        if (taskInstanceKey == null) {
-            logger.error("taskInstanceKey is null");
-            return;
-        }
         taskInstanceTimeoutCheckList.remove(taskInstanceKey);
+        logger.info("remove task instance from timeout check list");
     }
 
-    public void addTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+    public void addTask4RetryCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
+        logger.info("Adding task instance into retry check list");
         TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
-        if (taskInstanceKey == null) {
-            logger.error("taskInstanceKey is null");
-            return;
-        }
         if (taskInstanceRetryCheckList.contains(taskInstanceKey)) {
+            logger.warn("Task instance is already in retry check list");
             return;
         }
         TaskDefinition taskDefinition = taskInstance.getTaskDefine();
         if (taskDefinition == null) {
-            logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
+            logger.error("Add task instance into retry check list error, taskDefinition is null");
             return;
         }
-        logger.debug("addTask4RetryCheck, taskCode:{}, processInstanceId:{}", taskInstance.getTaskCode(), taskInstance.getProcessInstanceId());
         taskInstanceRetryCheckList.add(taskInstanceKey);
+        logger.info("[WorkflowInstance-{}][TaskInstance-{}] Added task instance into retry check list",
+            processInstance.getId(), taskInstance.getId());
     }
 
-    public void removeTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+    public void removeTask4RetryCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
         TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
-        if (taskInstanceKey == null) {
-            logger.error("taskInstanceKey is null");
-            return;
-        }
         taskInstanceRetryCheckList.remove(taskInstanceKey);
+        logger.info("remove task instance from retry check list");
     }
 
-    public void addTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+    public void addTask4StateCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
+        logger.info("Adding task instance into state check list");
         TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
-        if (taskInstanceKey == null) {
-            logger.error("taskInstanceKey is null");
-            return;
-        }
         if (taskInstanceStateCheckList.contains(taskInstanceKey)) {
+            logger.warn("Task instance is already in state check list");
             return;
         }
         if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
             taskInstanceStateCheckList.add(taskInstanceKey);
+            logger.info("Added task instance into state check list");
         }
     }
 
-    public void removeTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
+    public void removeTask4StateCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
         TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
-        if (taskInstanceKey == null) {
-            logger.error("taskInstanceKey is null");
-            return;
-        }
         taskInstanceStateCheckList.remove(taskInstanceKey);
+        logger.info("Removed task instance from state check list");
     }
 
     private void checkTask4Timeout() {
@@ -228,30 +226,35 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
             return;
         }
         for (TaskInstanceKey taskInstanceKey : taskInstanceTimeoutCheckList) {
-            int processInstanceId = taskInstanceKey.getProcessInstanceId();
-            long taskCode = taskInstanceKey.getTaskCode();
+            try {
+                int processInstanceId = taskInstanceKey.getProcessInstanceId();
+                LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId);
+                long taskCode = taskInstanceKey.getTaskCode();
 
-            WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
-            if (workflowExecuteThread == null) {
-                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
-                        processInstanceId, taskCode);
-                taskInstanceTimeoutCheckList.remove(taskInstanceKey);
-                continue;
-            }
-            Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
-            if (!taskInstanceOptional.isPresent()) {
-                logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
-                        processInstanceId, taskCode);
-                taskInstanceTimeoutCheckList.remove(taskInstanceKey);
-                continue;
-            }
-            TaskInstance taskInstance = taskInstanceOptional.get();
-            if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
-                long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
-                if (timeRemain < 0) {
-                    addTaskTimeoutEvent(taskInstance);
+                WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+                if (workflowExecuteThread == null) {
+                    logger.warn("Check task instance timeout failed, can not find workflowExecuteThread from cache manager, will remove this check task");
+                    taskInstanceTimeoutCheckList.remove(taskInstanceKey);
+                    continue;
+                }
+                Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
+                if (!taskInstanceOptional.isPresent()) {
+                    logger.warn("Check task instance timeout failed, can not get taskInstance from workflowExecuteThread, taskCode: {}"
+                        + "will remove this check task", taskCode);
                     taskInstanceTimeoutCheckList.remove(taskInstanceKey);
+                    continue;
+                }
+                TaskInstance taskInstance = taskInstanceOptional.get();
+                if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
+                    long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
+                    if (timeRemain < 0) {
+                        logger.info("Task instance is timeout, adding task timeout event and remove the check");
+                        addTaskTimeoutEvent(taskInstance);
+                        taskInstanceTimeoutCheckList.remove(taskInstanceKey);
+                    }
                 }
+            } finally {
+                LoggerUtils.removeWorkflowInstanceIdMDC();
             }
         }
     }
@@ -264,41 +267,46 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
         for (TaskInstanceKey taskInstanceKey : taskInstanceRetryCheckList) {
             int processInstanceId = taskInstanceKey.getProcessInstanceId();
             long taskCode = taskInstanceKey.getTaskCode();
+            try {
+                LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId);
 
-            WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+                WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
 
-            if (workflowExecuteThread == null) {
-                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
-                        processInstanceId, taskCode);
-                taskInstanceRetryCheckList.remove(taskInstanceKey);
-                continue;
-            }
+                if (workflowExecuteThread == null) {
+                    logger.warn("Task instance retry check failed, can not find workflowExecuteThread from cache manager, "
+                        + "will remove this check task");
+                    taskInstanceRetryCheckList.remove(taskInstanceKey);
+                    continue;
+                }
 
-            Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
-            ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
+                Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
+                ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
 
-            if (processInstance.getState() == ExecutionStatus.READY_STOP) {
-                addProcessStopEvent(processInstance);
-                taskInstanceRetryCheckList.remove(taskInstanceKey);
-                break;
-            }
+                if (processInstance.getState() == ExecutionStatus.READY_STOP) {
+                    logger.warn("The process instance is ready to stop, will send process stop event and remove the check task");
+                    addProcessStopEvent(processInstance);
+                    taskInstanceRetryCheckList.remove(taskInstanceKey);
+                    break;
+                }
 
-            if (!taskInstanceOptional.isPresent()) {
-                logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
-                        processInstanceId, taskCode);
-                taskInstanceRetryCheckList.remove(taskInstanceKey);
-                continue;
-            }
+                if (!taskInstanceOptional.isPresent()) {
+                    logger.warn("Task instance retry check failed, can not find taskInstance from workflowExecuteThread, will remove this check");
+                    taskInstanceRetryCheckList.remove(taskInstanceKey);
+                    continue;
+                }
 
-            TaskInstance taskInstance = taskInstanceOptional.get();
-            if (taskInstance.retryTaskIntervalOverTime()) {
-                // reset taskInstance endTime and state
-                // todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
-                taskInstance.setEndTime(null);
-                taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
+                TaskInstance taskInstance = taskInstanceOptional.get();
+                if (taskInstance.retryTaskIntervalOverTime()) {
+                    // reset taskInstance endTime and state
+                    // todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
+                    taskInstance.setEndTime(null);
+                    taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
 
-                addTaskRetryEvent(taskInstance);
-                taskInstanceRetryCheckList.remove(taskInstanceKey);
+                    addTaskRetryEvent(taskInstance);
+                    taskInstanceRetryCheckList.remove(taskInstanceKey);
+                }
+            } finally {
+                LoggerUtils.removeWorkflowInstanceIdMDC();
             }
         }
     }
@@ -311,25 +319,29 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
             int processInstanceId = taskInstanceKey.getProcessInstanceId();
             long taskCode = taskInstanceKey.getTaskCode();
 
-            WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
-            if (workflowExecuteThread == null) {
-                logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
-                        processInstanceId, taskCode);
-                taskInstanceStateCheckList.remove(taskInstanceKey);
-                continue;
-            }
-            Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
-            if (!taskInstanceOptional.isPresent()) {
-                logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
-                        processInstanceId, taskCode);
-                taskInstanceStateCheckList.remove(taskInstanceKey);
-                continue;
-            }
-            TaskInstance taskInstance = taskInstanceOptional.get();
-            if (taskInstance.getState().typeIsFinished()) {
-                continue;
+            try {
+                LoggerUtils.setTaskInstanceIdMDC(processInstanceId);
+                WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+                if (workflowExecuteThread == null) {
+                    logger.warn("Task instance state check failed, can not find workflowExecuteThread from cache manager, will remove this check task");
+                    taskInstanceStateCheckList.remove(taskInstanceKey);
+                    continue;
+                }
+                Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
+                if (!taskInstanceOptional.isPresent()) {
+                    logger.warn(
+                        "Task instance state check failed, can not find taskInstance from workflowExecuteThread, will remove this check event");
+                    taskInstanceStateCheckList.remove(taskInstanceKey);
+                    continue;
+                }
+                TaskInstance taskInstance = taskInstanceOptional.get();
+                if (taskInstance.getState().typeIsFinished()) {
+                    continue;
+                }
+                addTaskStateChangeEvent(taskInstance);
+            } finally {
+                LoggerUtils.removeWorkflowInstanceIdMDC();
             }
-            addTaskStateChangeEvent(taskInstance);
         }
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 07e0f14c11..9c5f4062f7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -43,6 +43,7 @@ import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
 import org.apache.dolphinscheduler.common.process.ProcessDag;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
@@ -271,12 +272,16 @@ public class WorkflowExecuteRunnable implements Runnable {
         while (!this.stateEvents.isEmpty()) {
             try {
                 StateEvent stateEvent = this.stateEvents.peek();
+                LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
                 if (stateEventHandler(stateEvent)) {
                     this.stateEvents.remove(stateEvent);
                 }
             } catch (Exception e) {
                 logger.error("state handle error:", e);
+            } finally {
+                LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
             }
+
         }
     }
 
@@ -775,7 +780,16 @@ public class WorkflowExecuteRunnable implements Runnable {
         if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING)) {
             cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
         }
-        cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, "yyyy-MM-dd HH:mm:ss", null));
+
+        if (cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
+            cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST,
+                cmdParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)
+                    .substring(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).indexOf(COMMA) + 1));
+        }
+
+        if (cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
+            cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, YYYY_MM_DD_HH_MM_SS, null));
+        }
         command.setCommandParam(JSONUtils.toJsonString(cmdParam));
         command.setTaskDependType(processInstance.getTaskDependType());
         command.setFailureStrategy(processInstance.getFailureStrategy());
@@ -962,8 +976,12 @@ public class WorkflowExecuteRunnable implements Runnable {
                 // reset global params while there are start parameters
                 setGlobalParamIfCommanded(processDefinition, cmdParam);
 
-                Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
-                Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
+                Date start = null;
+                Date end = null;
+                if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_END_DATE)){
+                    start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
+                    end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
+                }
                 List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
                 if (complementListDate.isEmpty() && needComplementProcess()) {
                     complementListDate = CronUtils.getSelfFireDateList(start, end, schedules);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 92ff7c04f1..031f8986ea 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.runner;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -48,6 +49,8 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
 
 import com.google.common.base.Strings;
 
+import lombok.NonNull;
+
 /**
  * Used to execute {@link WorkflowExecuteRunnable}, when
  */
@@ -79,7 +82,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
     @PostConstruct
     private void init() {
         this.setDaemon(true);
-        this.setThreadNamePrefix("Workflow-Execute-Thread-");
+        this.setThreadNamePrefix("WorkflowExecuteThread-");
         this.setMaxPoolSize(masterConfig.getExecThreads());
         this.setCorePoolSize(masterConfig.getExecThreads());
     }
@@ -90,10 +93,11 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
     public void submitStateEvent(StateEvent stateEvent) {
         WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
         if (workflowExecuteThread == null) {
-            logger.warn("workflowExecuteThread is null, stateEvent:{}", stateEvent);
+            logger.warn("Submit state event error, cannot from workflowExecuteThread from cache manager, stateEvent:{}", stateEvent);
             return;
         }
         workflowExecuteThread.addStateEvent(stateEvent);
+        logger.info("Submit state event success, stateEvent: {}", stateEvent);
     }
 
     /**
@@ -112,7 +116,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
             return;
         }
         if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) {
-            logger.warn("The workflow:{} has been executed by another thread", workflowExecuteThread.getKey());
+            logger.warn("The workflow has been executed by another thread");
             return;
         }
         multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
@@ -121,24 +125,31 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
         future.addCallback(new ListenableFutureCallback() {
             @Override
             public void onFailure(Throwable ex) {
-                logger.error("handle events {} failed", processInstanceId, ex);
-                multiThreadFilterMap.remove(workflowExecuteThread.getKey());
+                LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId);
+                try {
+                    logger.error("Workflow instance events handle failed", ex);
+                    multiThreadFilterMap.remove(workflowExecuteThread.getKey());
+                } finally {
+                    LoggerUtils.removeWorkflowInstanceIdMDC();
+                }
             }
 
             @Override
             public void onSuccess(Object result) {
                 try {
+                    LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId());
                     if (workflowExecuteThread.workFlowFinish()) {
                         stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance());
                         processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
                         notifyProcessChanged(workflowExecuteThread.getProcessInstance());
-                        logger.info("process instance {} finished.", processInstanceId);
+                        logger.info("Workflow instance is finished.");
                     }
                 } catch (Exception e) {
-                    logger.error("handle events {} success, but notify changed error", processInstanceId, e);
+                    logger.error("Workflow instance is finished, but notify changed error", e);
                 } finally {
                     // make sure the process has been removed from multiThreadFilterMap
                     multiThreadFilterMap.remove(workflowExecuteThread.getKey());
+                    LoggerUtils.removeWorkflowInstanceIdMDC();
                 }
             }
         });
@@ -167,9 +178,9 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
     /**
      * notify myself
      */
-    private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) {
-        logger.info("notify process {} task {} state change", processInstance.getId(), taskInstance.getId());
+    private void notifyMyself(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
         if (!processInstanceExecCacheManager.contains(processInstance.getId())) {
+            logger.warn("The execute cache manager doesn't contains this workflow instance");
             return;
         }
         StateEvent stateEvent = new StateEvent();
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
index 41c2bd56d3..4c732a0325 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
@@ -30,9 +30,12 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import lombok.experimental.UtilityClass;
+
 /**
  * the factory to create task processor
  */
+@UtilityClass
 public final class TaskProcessorFactory {
 
     private static final Logger logger = LoggerFactory.getLogger(TaskProcessorFactory.class);
@@ -46,7 +49,7 @@ public final class TaskProcessorFactory {
             try {
                 PROCESS_MAP.put(iTaskProcessor.getType(), (Constructor<ITaskProcessor>) iTaskProcessor.getClass().getConstructor());
             } catch (NoSuchMethodException e) {
-                throw new IllegalArgumentException("The task processor should has a no args constructor");
+                throw new IllegalArgumentException("The task processor should has a no args constructor", e);
             }
         }
     }
@@ -57,7 +60,6 @@ public final class TaskProcessorFactory {
         }
         Constructor<ITaskProcessor> iTaskProcessorConstructor = PROCESS_MAP.get(type);
         if (iTaskProcessorConstructor == null) {
-            logger.warn("ITaskProcessor could not found for taskType: {}", type);
             iTaskProcessorConstructor = PROCESS_MAP.get(DEFAULT_PROCESSOR);
         }
 
@@ -74,7 +76,4 @@ public final class TaskProcessorFactory {
         return PROCESS_MAP.containsKey(type);
     }
 
-    private TaskProcessorFactory() {
-        throw new UnsupportedOperationException("TaskProcessorFactory cannot be instantiated");
-    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
index 3cc92fb905..94f33ccd83 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
@@ -17,8 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.service;
 
-import io.micrometer.core.annotation.Counted;
-import io.micrometer.core.annotation.Timed;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.NodeType;
@@ -55,6 +54,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
+import io.micrometer.core.annotation.Counted;
+import io.micrometer.core.annotation.Timed;
+
 /**
  * failover service
  */
@@ -66,12 +68,14 @@ public class FailoverService {
     private final ProcessService processService;
     private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
 
-    public FailoverService(RegistryClient registryClient, MasterConfig masterConfig, ProcessService processService,
+    public FailoverService(RegistryClient registryClient,
+                           MasterConfig masterConfig,
+                           ProcessService processService,
                            WorkflowExecuteThreadPool workflowExecuteThreadPool) {
-        this.registryClient = registryClient;
-        this.masterConfig = masterConfig;
-        this.processService = processService;
-        this.workflowExecuteThreadPool = workflowExecuteThreadPool;
+        this.registryClient = checkNotNull(registryClient);
+        this.masterConfig = checkNotNull(masterConfig);
+        this.processService = checkNotNull(processService);
+        this.workflowExecuteThreadPool = checkNotNull(workflowExecuteThreadPool);
     }
 
     /**
@@ -84,7 +88,7 @@ public class FailoverService {
         if (CollectionUtils.isEmpty(hosts)) {
             return;
         }
-        LOGGER.info("{} begin to failover hosts:{}", getLocalAddress(), hosts);
+        LOGGER.info("Master failover service {} begin to failover hosts:{}", getLocalAddress(), hosts);
 
         for (String host : hosts) {
             failoverMasterWithLock(host);
@@ -274,7 +278,7 @@ public class FailoverService {
         while (iterator.hasNext()) {
             String host = iterator.next();
             if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
-                if (!host.equals(getLocalAddress())) {
+                if (!getLocalAddress().equals(host)) {
                     iterator.remove();
                 }
             }
@@ -294,7 +298,7 @@ public class FailoverService {
         boolean taskNeedFailover = true;
 
         if (taskInstance == null) {
-            LOGGER.error("failover task instance error, taskInstance is null");
+            LOGGER.error("Master failover task instance error, taskInstance is null");
             return false;
         }
 
diff --git a/dolphinscheduler-master/src/main/resources/logback-spring.xml b/dolphinscheduler-master/src/main/resources/logback-spring.xml
index 2e9cb45ada..f1a2c1d51b 100644
--- a/dolphinscheduler-master/src/main/resources/logback-spring.xml
+++ b/dolphinscheduler-master/src/main/resources/logback-spring.xml
@@ -21,7 +21,7 @@
     <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
         <encoder>
             <pattern>
-                [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n
+                [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
             </pattern>
             <charset>UTF-8</charset>
         </encoder>
@@ -57,7 +57,7 @@
         </rollingPolicy>
         <encoder>
             <pattern>
-                [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n
+                [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
             </pattern>
             <charset>UTF-8</charset>
         </encoder>
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
index f5d7f935b3..ce6ad57035 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +38,7 @@ public class AlertClientService implements AutoCloseable {
 
     private final NettyRemotingClient client;
 
-    private volatile boolean isRunning;
+    private final AtomicBoolean isRunning;
 
     private String host;
 
@@ -53,16 +55,14 @@ public class AlertClientService implements AutoCloseable {
     public AlertClientService() {
         this.clientConfig = new NettyClientConfig();
         this.client = new NettyRemotingClient(clientConfig);
-        this.isRunning = true;
+        this.isRunning = new AtomicBoolean(true);
     }
 
     /**
      * alert client
      */
     public AlertClientService(String host, int port) {
-        this.clientConfig = new NettyClientConfig();
-        this.client = new NettyRemotingClient(clientConfig);
-        this.isRunning = true;
+        this();
         this.host = host;
         this.port = port;
     }
@@ -72,9 +72,14 @@ public class AlertClientService implements AutoCloseable {
      */
     @Override
     public void close() {
+        if (isRunning.compareAndSet(true, false)) {
+            logger.warn("Alert client is already closed");
+            return;
+        }
+
+        logger.info("Alter client closing");
         this.client.close();
-        this.isRunning = false;
-        logger.info("alter client closed");
+        logger.info("Alter client closed");
     }
 
     /**
@@ -116,6 +121,6 @@ public class AlertClientService implements AutoCloseable {
     }
 
     public boolean isRunning() {
-        return isRunning;
+        return isRunning.get();
     }
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
index 7dffe9c204..289e8ddbfb 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.dao.PluginDao;
 import org.apache.dolphinscheduler.dao.entity.PluginDefine;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
+import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
 import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
@@ -93,7 +94,7 @@ public class TaskPluginManager {
             logger.info("Registering task plugin: {}", name);
 
             if (!names.add(name)) {
-                throw new IllegalStateException(format("Duplicate task plugins named '%s'", name));
+                throw new TaskPluginException(format("Duplicate task plugins named '%s'", name));
             }
 
             loadTaskChannel(factory);
@@ -106,7 +107,7 @@ public class TaskPluginManager {
             PluginDefine pluginDefine = new PluginDefine(name, PluginType.TASK.getDesc(), paramsJson);
             int count = pluginDao.addOrUpdatePluginDefine(pluginDefine);
             if (count <= 0) {
-                throw new RuntimeException("Failed to update task plugin: " + name);
+                throw new TaskPluginException("Failed to update task plugin: " + name);
             }
         });
     }
diff --git a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml
index d6ef76fa9e..0cdeabde5e 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml
+++ b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml
@@ -22,7 +22,7 @@
     <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
         <encoder>
             <pattern>
-                [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n
+                [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
             </pattern>
             <charset>UTF-8</charset>
         </encoder>
@@ -63,7 +63,7 @@
                 <file>${log.base}/${taskAppId}.log</file>
                 <encoder>
                     <pattern>
-                        [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} [%thread] %logger{96}:[%line] - %messsage%n
+                        [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} [%thread] %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %messsage%n
                     </pattern>
                     <charset>UTF-8</charset>
                 </encoder>
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ProcessUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ProcessUtils.java
index d052075988..528dfa4058 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ProcessUtils.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ProcessUtils.java
@@ -25,6 +25,8 @@ import java.util.regex.Pattern;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import lombok.NonNull;
+
 public final class ProcessUtils {
     private static final Logger logger = LoggerFactory.getLogger(ProcessUtils.class);
 
@@ -46,13 +48,13 @@ public final class ProcessUtils {
     /**
      * kill tasks according to different task types.
      */
-    public static void kill(TaskExecutionContext request) {
+    public static boolean kill(@NonNull TaskExecutionContext request) {
         try {
+            logger.info("Begin kill task instance, processId: {}", request.getProcessId());
             int processId = request.getProcessId();
             if (processId == 0) {
-                logger.error("process kill failed, process id :{}, task id:{}",
-                        processId, request.getTaskInstanceId());
-                return;
+                logger.error("Task instance kill failed, processId is not exist");
+                return false;
             }
 
             String cmd = String.format("kill -9 %s", getPidsStr(processId));
@@ -60,8 +62,11 @@ public final class ProcessUtils {
             logger.info("process id:{}, cmd:{}", processId, cmd);
 
             OSUtils.exeCmd(cmd);
+            logger.info("Success kill task instance, processId: {}", request.getProcessId());
+            return true;
         } catch (Exception e) {
-            logger.error("kill task failed", e);
+            logger.error("Kill task instance error, processId: {}", request.getProcessId(), e);
+            return false;
         }
     }
 
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginException.java
similarity index 60%
copy from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
copy to dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginException.java
index 67abde7e7a..246835260b 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginException.java
@@ -15,26 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.thread;
+package org.apache.dolphinscheduler.plugin.task.api;
 
-import java.util.concurrent.atomic.AtomicBoolean;
+public class TaskPluginException extends RuntimeException {
 
-/**
- *  if the process closes, a signal is placed as true, and all threads get this flag to stop working
- */
-public class Stopper {
-
-    private static AtomicBoolean signal = new AtomicBoolean(false);
-
-    public static final boolean isStopped() {
-        return signal.get();
-    }
-
-    public static final boolean isRunning() {
-        return !signal.get();
-    }
-
-    public static final void stop() {
-        signal.set(true);
+    public TaskPluginException(String message) {
+        super(message);
     }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 8ac528dee4..93dc6309db 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -21,18 +21,11 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
-import org.apache.dolphinscheduler.remote.NettyRemotingServer;
-import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
-import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
-import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
-import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor;
-import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
-import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
+import org.apache.dolphinscheduler.server.worker.prc.WorkerRpcServer;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
 import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
@@ -65,17 +58,6 @@ public class WorkerServer implements IStoppable {
      */
     private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class);
 
-    /**
-     * netty remote server
-     */
-    private NettyRemotingServer nettyRemotingServer;
-
-    /**
-     * worker config
-     */
-    @Autowired
-    private WorkerConfig workerConfig;
-
     /**
      * spring application context
      * only use it for initialization
@@ -105,22 +87,7 @@ public class WorkerServer implements IStoppable {
     private TaskPluginManager taskPluginManager;
 
     @Autowired
-    private TaskExecuteProcessor taskExecuteProcessor;
-
-    @Autowired
-    private TaskKillProcessor taskKillProcessor;
-
-    @Autowired
-    private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
-
-    @Autowired
-    private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
-
-    @Autowired
-    private HostUpdateProcessor hostUpdateProcessor;
-
-    @Autowired
-    private LoggerRequestProcessor loggerRequestProcessor;
+    private WorkerRpcServer workerRpcServer;
 
     /**
      * worker server startup, not use web service
@@ -132,48 +99,19 @@ public class WorkerServer implements IStoppable {
         SpringApplication.run(WorkerServer.class);
     }
 
-    /**
-     * worker server run
-     */
     @PostConstruct
     public void run() {
-        // init remoting server
-        NettyServerConfig serverConfig = new NettyServerConfig();
-        serverConfig.setListenPort(workerConfig.getListenPort());
-        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
-
-        // logger server
-        this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
-
-        this.nettyRemotingServer.start();
-
-        // install task plugin
-        this.taskPluginManager.installPlugin();
+        this.workerRpcServer.start();
 
-        // worker registry
-        try {
-            this.workerRegistryClient.registry();
-            this.workerRegistryClient.setRegistryStoppable(this);
-            Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
+        this.taskPluginManager.installPlugin();
 
-            this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            throw new RuntimeException(e);
-        }
+        this.workerRegistryClient.registry();
+        this.workerRegistryClient.setRegistryStoppable(this);
+        Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
+        this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
 
-        // task execute manager
         this.workerManagerThread.start();
 
-        // retry report task status
         this.retryReportTaskStatusThread.start();
 
         /*
@@ -181,7 +119,7 @@ public class WorkerServer implements IStoppable {
          */
         Runtime.getRuntime().addShutdownHook(new Thread(() -> {
             if (Stopper.isRunning()) {
-                close("shutdownHook");
+                close("WorkerServer shutdown hook");
             }
         }));
     }
@@ -189,24 +127,23 @@ public class WorkerServer implements IStoppable {
     public void close(String cause) {
         try {
             // execute only once
-            if (Stopper.isStopped()) {
+            // set stop signal is true
+            if (!Stopper.stop()) {
+                logger.warn("WorkerServer is already stopped, current cause: {}", cause);
                 return;
             }
 
-            logger.info("worker server is stopping ..., cause : {}", cause);
-
-            // set stop signal is true
-            Stopper.stop();
+            logger.info("Worker server is stopping, current cause : {}", cause);
 
             try {
                 // thread sleep 3 seconds for thread quitely stop
-                Thread.sleep(3000L);
+                Thread.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
             } catch (Exception e) {
-                logger.warn("thread sleep exception", e);
+                logger.warn("Worker server close wait error", e);
             }
 
             // close
-            this.nettyRemotingServer.close();
+            this.workerRpcServer.close();
             this.workerRegistryClient.unRegistry();
             this.alertClientService.close();
 
@@ -215,8 +152,9 @@ public class WorkerServer implements IStoppable {
 
             // close the application context
             this.springApplicationContext.close();
+            logger.info("Worker server stopped, current cause: {}", cause);
         } catch (Exception e) {
-            logger.error("worker server stop exception ", e);
+            logger.error("Worker server stop failed, current cause: {}", cause, e);
         }
     }
 
@@ -230,15 +168,22 @@ public class WorkerServer implements IStoppable {
      */
     public void killAllRunningTasks() {
         Collection<TaskExecutionContext> taskRequests = TaskExecutionContextCacheManager.getAllTaskRequestList();
-        logger.info("ready to kill all cache job, job size:{}", taskRequests.size());
-
         if (CollectionUtils.isEmpty(taskRequests)) {
             return;
         }
-
+        logger.info("Worker begin to kill all cache task, task size: {}", taskRequests.size());
+        int killNumber = 0;
         for (TaskExecutionContext taskRequest : taskRequests) {
             // kill task when it's not finished yet
-            org.apache.dolphinscheduler.plugin.task.api.ProcessUtils.kill(taskRequest);
+            try {
+                LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(), taskRequest.getTaskInstanceId());
+                if (ProcessUtils.kill(taskRequest)) {
+                    killNumber++;
+                }
+            } finally {
+                LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
+            }
         }
+        logger.info("Worker after kill all cache task, task size: {}, killed number: {}", taskRequests.size(), killNumber);
     }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/prc/WorkerRpcServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/prc/WorkerRpcServer.java
new file mode 100644
index 0000000000..acb0ac6508
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/prc/WorkerRpcServer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.prc;
+
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskRecallAckProcessor;
+
+import java.io.Closeable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class WorkerRpcServer implements Closeable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRpcServer.class);
+
+    @Autowired
+    private TaskExecuteProcessor taskExecuteProcessor;
+
+    @Autowired
+    private TaskKillProcessor taskKillProcessor;
+
+    @Autowired
+    private TaskRecallAckProcessor taskRecallAckProcessor;
+
+    @Autowired
+    private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
+
+    @Autowired
+    private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
+
+    @Autowired
+    private HostUpdateProcessor hostUpdateProcessor;
+
+    @Autowired
+    private LoggerRequestProcessor loggerRequestProcessor;
+
+    @Autowired
+    private WorkerConfig workerConfig;
+
+    private NettyRemotingServer nettyRemotingServer;
+
+    public void start() {
+        LOGGER.info("Worker rpc server starting");
+        NettyServerConfig serverConfig = new NettyServerConfig();
+        serverConfig.setListenPort(workerConfig.getListenPort());
+        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL_ACK, taskRecallAckProcessor);
+        // logger server
+        this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
+        this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
+        this.nettyRemotingServer.start();
+        LOGGER.info("Worker rpc server started");
+    }
+
+    @Override
+    public void close() {
+        LOGGER.info("Worker rpc server closing");
+        this.nettyRemotingServer.close();
+        LOGGER.info("Worker rpc server closed");
+    }
+
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 34382257d7..b355c6702b 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -18,10 +18,12 @@
 package org.apache.dolphinscheduler.server.worker.processor;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
 import org.apache.dolphinscheduler.common.utils.CommonUtils;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -90,15 +92,18 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
     @Autowired
     private WorkerManagerThread workerManager;
 
-    @Counted(value = "dolphinscheduler_task_execution_count", description = "task execute total count")
-    @Timed(value = "dolphinscheduler_task_execution_timer", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
+    @Autowired(required = false)
+    private StorageOperate storageOperate;
+
+    @Counted(value = "ds.task.execution.count", description = "task execute total count")
+    @Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
-                String.format("invalid command type : %s", command.getType()));
+                                    String.format("invalid command type : %s", command.getType()));
 
-        TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject(
-                command.getBody(), TaskExecuteRequestCommand.class);
+        TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject(command.getBody(),
+                                                                             TaskExecuteRequestCommand.class);
 
         if (taskRequestCommand == null) {
             logger.error("task execute request command is null");
@@ -113,70 +118,98 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
             logger.error("task execution context is null");
             return;
         }
-        TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
-
-        // set cache, it will be used when kill task
-        TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
-
-        // todo custom logger
-
-        taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
-        taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
-
-        if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
-            if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
-                OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
-            }
-
-            // check if the OS user exists
-            if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
-                logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
-                        taskExecutionContext.getTenantCode(), taskExecutionContext.getTaskInstanceId());
-                TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-                taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
-                taskExecutionContext.setEndTime(new Date());
-                taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
-                return;
-            }
-
-            // local execute path
-            String execLocalPath = getExecLocalPath(taskExecutionContext);
-            logger.info("task instance local execute path : {}", execLocalPath);
-            taskExecutionContext.setExecutePath(execLocalPath);
-
-            try {
-                FileUtils.createWorkDirIfAbsent(execLocalPath);
-            } catch (Throwable ex) {
-                logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}", execLocalPath, taskExecutionContext.getTaskInstanceId());
-                logger.error("create executeLocalPath fail", ex);
-                TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-                taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
-                taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
-                return;
+        try {
+            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
+                                                        taskExecutionContext.getTaskInstanceId());
+
+            TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
+
+            // set cache, it will be used when kill task
+            TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
+
+            // todo custom logger
+
+            taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
+            taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
+
+            if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
+                boolean osUserExistFlag;
+                //if Using distributed is true and Currently supported systems are linux,Should not let it automatically
+                //create tenants,so TenantAutoCreate has no effect
+                if (workerConfig.isTenantDistributedUser() && SystemUtils.IS_OS_LINUX) {
+                    //use the id command to judge in linux
+                    osUserExistFlag = OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode());
+                } else if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
+                    // if not exists this user, then create
+                    OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
+                    osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
+                } else {
+                    osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
+                }
+                if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
+                    if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
+                        OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
+                    }
+
+                    // check if the OS user exists
+                    if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
+                        logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
+                                     taskExecutionContext.getTenantCode(),
+                                     taskExecutionContext.getTaskInstanceId());
+                        TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+                        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
+                        taskExecutionContext.setEndTime(new Date());
+                        taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+                        return;
+                    }
+
+                    // local execute path
+                    String execLocalPath = getExecLocalPath(taskExecutionContext);
+                    logger.info("task instance local execute path : {}", execLocalPath);
+                    taskExecutionContext.setExecutePath(execLocalPath);
+
+                    try {
+                        FileUtils.createWorkDirIfAbsent(execLocalPath);
+                    } catch (Throwable ex) {
+                        logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}",
+                                     execLocalPath,
+                                     taskExecutionContext.getTaskInstanceId());
+                        logger.error("create executeLocalPath fail", ex);
+                        TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+                        taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
+                        taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+                        return;
+                    }
+                }
+
+                taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
+                                                     new NettyRemoteChannel(channel, command.getOpaque()));
+
+                // delay task process
+                long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
+                                                          taskExecutionContext.getDelayTime() * 60L);
+                if (remainTime > 0) {
+                    logger.info("delay the execution of task instance {}, delay time: {} s",
+                                taskExecutionContext.getTaskInstanceId(),
+                                remainTime);
+                    taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
+                    taskExecutionContext.setStartTime(null);
+                    taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext);
+                }
+
+                // submit task to manager
+                boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext,
+                                                                          taskCallbackService,
+                                                                          alertClientService,
+                                                                          taskPluginManager));
+                if (!offer) {
+                    logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}",
+                                 workerManager.getDelayQueueSize(),
+                                 taskExecutionContext.getTaskInstanceId());
+                    taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
+                    taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+                }
             }
-        }
-
-        taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
-                new NettyRemoteChannel(channel, command.getOpaque()));
-
-        // delay task process
-        long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L);
-        if (remainTime > 0) {
-            logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime);
-            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
-            taskExecutionContext.setStartTime(null);
-            taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext);
-        }
-
-        // submit task to manager
-        boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager));
-        if (!offer) {
-            logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}",
-                    workerManager.getDelayQueueSize(), taskExecutionContext.getTaskInstanceId());
-            taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
-            taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
-        }
-    }
 
     /**
      * get execute local path
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
index 9d74dc5dcc..421e92a530 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.server.worker.processor;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -44,19 +45,23 @@ public class TaskExecuteRunningAckProcessor implements NettyRequestProcessor {
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING_ACK == command.getType(),
-                String.format("invalid command type : %s", command.getType()));
+            String.format("invalid command type : %s", command.getType()));
 
         TaskExecuteRunningAckCommand runningAckCommand = JSONUtils.parseObject(
-                command.getBody(), TaskExecuteRunningAckCommand.class);
-
+            command.getBody(), TaskExecuteRunningAckCommand.class);
         if (runningAckCommand == null) {
             logger.error("task execute running ack command is null");
             return;
         }
-        logger.info("task execute running ack command : {}", runningAckCommand);
+        try {
+            LoggerUtils.setTaskInstanceIdMDC(runningAckCommand.getTaskInstanceId());
+            logger.info("task execute running ack command : {}", runningAckCommand);
 
-        if (runningAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
-            ResponseCache.get().removeRunningCache(runningAckCommand.getTaskInstanceId());
+            if (runningAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
+                ResponseCache.get().removeRunningCache(runningAckCommand.getTaskInstanceId());
+            }
+        } finally {
+            LoggerUtils.removeTaskInstanceIdMDC();
         }
     }
 
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
index fc737ca1de..0e48465a07 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
@@ -47,9 +47,11 @@ public class RetryReportTaskStatusThread implements Runnable {
     private TaskCallbackService taskCallbackService;
 
     public void start() {
+        logger.info("Retry report task status thread starting");
         Thread thread = new Thread(this, "RetryReportTaskStatusThread");
         thread.setDaemon(true);
         thread.start();
+        logger.info("Retry report task status thread started");
     }
 
     /**
@@ -83,7 +85,7 @@ public class RetryReportTaskStatusThread implements Runnable {
                     }
                 }
             } catch (Exception e) {
-                logger.warn("retry report task status error", e);
+                logger.warn("Retry report task status error", e);
             }
         }
     }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index e6ff2b5653..ae89e9e946 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -76,14 +76,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
      */
     private TaskExecutionContext taskExecutionContext;
 
-    public StorageOperate getStorageOperate() {
-        return storageOperate;
-    }
-
-    public void setStorageOperate(StorageOperate storageOperate) {
-        this.storageOperate = storageOperate;
-    }
-
     private StorageOperate storageOperate;
 
     /**
@@ -107,24 +99,28 @@ public class TaskExecuteThread implements Runnable, Delayed {
      * constructor
      *
      * @param taskExecutionContext taskExecutionContext
-     * @param taskCallbackService taskCallbackService
+     * @param taskCallbackService  taskCallbackService
      */
     public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
                              TaskCallbackService taskCallbackService,
-                             AlertClientService alertClientService) {
+                             AlertClientService alertClientService,
+                             StorageOperate storageOperate) {
         this.taskExecutionContext = taskExecutionContext;
         this.taskCallbackService = taskCallbackService;
         this.alertClientService = alertClientService;
+        this.storageOperate = storageOperate;
     }
 
     public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
                              TaskCallbackService taskCallbackService,
                              AlertClientService alertClientService,
-                             TaskPluginManager taskPluginManager) {
+                             TaskPluginManager taskPluginManager,
+                             StorageOperate storageOperate) {
         this.taskExecutionContext = taskExecutionContext;
         this.taskCallbackService = taskCallbackService;
         this.alertClientService = alertClientService;
         this.taskPluginManager = taskPluginManager;
+        this.storageOperate = storageOperate;
     }
 
     @Override
@@ -139,6 +135,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
         }
 
         try {
+            LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
             logger.info("script path : {}", taskExecutionContext.getExecutePath());
             if (taskExecutionContext.getStartTime() == null) {
                 taskExecutionContext.setStartTime(new Date());
@@ -151,7 +148,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
 
             // copy hdfs/minio file to local
             List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources());
-            if (!fileDownloads.isEmpty()){
+            if (!fileDownloads.isEmpty()) {
                 downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
             }
 
@@ -211,6 +208,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
             TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
             taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
             clearTaskExecPath();
+            LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
         }
     }
 
@@ -308,11 +306,12 @@ public class TaskExecuteThread implements Runnable, Delayed {
 
     /**
      * download resource check
+     *
      * @param execLocalPath
      * @param projectRes
      * @return
      */
-    public List<Pair<String, String>> downloadCheck(String execLocalPath, Map<String, String> projectRes){
+    public List<Pair<String, String>> downloadCheck(String execLocalPath, Map<String, String> projectRes) {
         if (MapUtils.isEmpty(projectRes)) {
             return Collections.emptyList();
         }
@@ -320,13 +319,13 @@ public class TaskExecuteThread implements Runnable, Delayed {
         projectRes.forEach((key, value) -> {
             File resFile = new File(execLocalPath, key);
             boolean notExist = !resFile.exists();
-            if (notExist){
+            if (notExist) {
                 downloadFile.add(Pair.of(key, value));
-            } else{
+            } else {
                 logger.info("file : {} exists ", resFile.getName());
             }
         });
-        if (!downloadFile.isEmpty() && !PropertyUtils.getResUploadStartupState()){
+        if (!downloadFile.isEmpty() && !PropertyUtils.getResUploadStartupState()) {
             throw new StorageOperateNoConfiguredException("Storage service config does not exist!");
         }
         return downloadFile;
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 2702d4af42..ae735f4338 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -24,13 +24,10 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.DelayQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -132,9 +129,11 @@ public class WorkerManagerThread implements Runnable {
     }
 
     public void start() {
+        logger.info("Worker manager thread starting");
         Thread thread = new Thread(this, this.getClass().getName());
         thread.setDaemon(true);
         thread.start();
+        logger.info("Worker manager thread started");
     }
 
     @Override
diff --git a/dolphinscheduler-worker/src/main/resources/logback-spring.xml b/dolphinscheduler-worker/src/main/resources/logback-spring.xml
index c6d8ee415d..571a3addd2 100644
--- a/dolphinscheduler-worker/src/main/resources/logback-spring.xml
+++ b/dolphinscheduler-worker/src/main/resources/logback-spring.xml
@@ -22,7 +22,7 @@
     <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
         <encoder>
             <pattern>
-                [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n
+                [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
             </pattern>
             <charset>UTF-8</charset>
         </encoder>
@@ -58,7 +58,7 @@
         </rollingPolicy>
         <encoder>
             <pattern>
-                [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n
+                [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
             </pattern>
             <charset>UTF-8</charset>
         </encoder>
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
index 2bb23cad6a..13c533f1cf 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.worker.processor;
 
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -53,7 +54,7 @@ import org.slf4j.Logger;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class,
-        JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
+    JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
 @Ignore
 public class TaskExecuteProcessorTest {
 
@@ -63,6 +64,8 @@ public class TaskExecuteProcessorTest {
 
     private ExecutorService workerExecService;
 
+    private StorageOperate storageOperate;
+
     private WorkerConfig workerConfig;
 
     private Command command;
@@ -99,19 +102,23 @@ public class TaskExecuteProcessorTest {
 
         PowerMockito.mockStatic(SpringApplicationContext.class);
         PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class))
-                .thenReturn(taskCallbackService);
+            .thenReturn(taskCallbackService);
         PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class))
-                .thenReturn(workerConfig);
+            .thenReturn(workerConfig);
 
         workerManager = PowerMockito.mock(WorkerManagerThread.class);
-        PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService))).thenReturn(Boolean.TRUE);
+
+        storageOperate = PowerMockito.mock(StorageOperate.class);
+        PowerMockito.when(
+                workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, storageOperate)))
+            .thenReturn(Boolean.TRUE);
 
         PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class))
-                .thenReturn(workerManager);
+            .thenReturn(workerManager);
 
         PowerMockito.mockStatic(ThreadUtils.class);
         PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()))
-                .thenReturn(workerExecService);
+            .thenReturn(workerExecService);
 
         PowerMockito.mockStatic(JsonSerializer.class);
         PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskExecuteRequestCommand.class))
@@ -125,16 +132,17 @@ public class TaskExecuteProcessorTest {
 
         PowerMockito.mockStatic(FileUtils.class);
         PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
-                        taskExecutionContext.getProcessDefineCode(),
-                        taskExecutionContext.getProcessDefineVersion(),
-                        taskExecutionContext.getProcessInstanceId(),
-                        taskExecutionContext.getTaskInstanceId()))
-                .thenReturn(taskExecutionContext.getExecutePath());
+                taskExecutionContext.getProcessDefineCode(),
+                taskExecutionContext.getProcessDefineVersion(),
+                taskExecutionContext.getProcessInstanceId(),
+                taskExecutionContext.getTaskInstanceId()))
+            .thenReturn(taskExecutionContext.getExecutePath());
         PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());
 
-        SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(null, null, null, alertClientService);
+        SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(
+            null, null, null, alertClientService, storageOperate);
         PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments()
-                .thenReturn(simpleTaskExecuteThread);
+            .thenReturn(simpleTaskExecuteThread);
     }
 
     @Test
@@ -172,8 +180,12 @@ public class TaskExecuteProcessorTest {
 
     private static class SimpleTaskExecuteThread extends TaskExecuteThread {
 
-        public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService, Logger taskLogger, AlertClientService alertClientService) {
-            super(taskExecutionContext, taskCallbackService, alertClientService);
+        public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext,
+                                       TaskCallbackService taskCallbackService,
+                                       Logger taskLogger,
+                                       AlertClientService alertClientService,
+                                       StorageOperate storageOperate) {
+            super(taskExecutionContext, taskCallbackService, alertClientService, storageOperate);
         }
 
         @Override
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
index f847690a6c..90577111b9 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
@@ -17,12 +17,20 @@
 
 package org.apache.dolphinscheduler.server.worker.runner;
 
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.dolphinscheduler.common.storage.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClientTest;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -31,11 +39,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 @RunWith(PowerMockRunner.class)
 public class TaskExecuteThreadTest {
 
@@ -50,20 +53,24 @@ public class TaskExecuteThreadTest {
     @Mock
     private AlertClientService alertClientService;
 
+    @Mock
+    private StorageOperate storageOperate;
+
     @Mock
     private TaskPluginManager taskPluginManager;
 
     @Test
-    public void checkTest(){
-        TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager);
+    public void checkTest() {
+        TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService,
+            alertClientService, taskPluginManager, storageOperate);
 
         String path = "/";
         Map<String, String> projectRes = new HashMap<>();
         projectRes.put("shell", "shell.sh");
         List<Pair<String, String>> downloads = new ArrayList<>();
-        try{
+        try {
             downloads = taskExecuteThread.downloadCheck(path, projectRes);
-        }catch (Exception e){
+        } catch (Exception e) {
             Assert.assertNotNull(e);
         }
         downloads.add(Pair.of("shell", "shell.sh"));