You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/06/28 09:43:23 UTC

[incubator-dolphinscheduler] branch dev updated: [Feature-2925][server] Init TaskLogger in TaskExecuteProcessor (#2925) (#2965)

This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 38e4853  [Feature-2925][server] Init TaskLogger in TaskExecuteProcessor (#2925) (#2965)
38e4853 is described below

commit 38e485373de9a7301a19f57cb53dbf4cb6744696
Author: Yichao Yang <10...@qq.com>
AuthorDate: Sun Jun 28 17:43:13 2020 +0800

    [Feature-2925][server] Init TaskLogger in TaskExecuteProcessor (#2925) (#2965)
    
    * [Feature-2925][common] Add exitVal judge in OSUtils.exeCmd (#2925)
    
    * optimize the logger utils
---
 .../common/shell/AbstractShell.java                |  40 +++++---
 .../dolphinscheduler/common/utils/FileUtils.java   |  60 +++++++++---
 .../dolphinscheduler/common/utils/LoggerUtils.java |  27 +++++-
 .../dolphinscheduler/common/utils/OSUtils.java     | 103 ++++++++++++---------
 .../dolphinscheduler/common/utils/OSUtilsTest.java |   6 +-
 .../dolphinscheduler/server/log/TaskLogFilter.java |   9 +-
 .../worker/processor/TaskExecuteProcessor.java     |  42 ++++++---
 .../server/worker/runner/TaskExecuteThread.java    |  36 +++----
 8 files changed, 220 insertions(+), 103 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/shell/AbstractShell.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/shell/AbstractShell.java
index f846b19..aafdb86 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/shell/AbstractShell.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/shell/AbstractShell.java
@@ -16,9 +16,6 @@
  */
 package org.apache.dolphinscheduler.common.shell;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
@@ -30,6 +27,9 @@ import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /** 
  * A base class for running a Unix command.
@@ -128,7 +128,7 @@ public abstract class AbstractShell {
   /**
    * Run a command   actual work
    */
-  private void runCommand() throws IOException { 
+  private void runCommand() throws IOException {
     ProcessBuilder builder = new ProcessBuilder(getExecString());
     Timer timeOutTimer = null;
     ShellTimeoutTimerTask timeoutTimerTask = null;
@@ -153,11 +153,11 @@ public abstract class AbstractShell {
       timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
     }
     final BufferedReader errReader = 
-            new BufferedReader(new InputStreamReader(process
-                                                     .getErrorStream()));
-    BufferedReader inReader = 
-            new BufferedReader(new InputStreamReader(process
-                                                     .getInputStream()));
+            new BufferedReader(
+                    new InputStreamReader(process.getErrorStream()));
+    BufferedReader inReader =
+            new BufferedReader(
+                    new InputStreamReader(process.getInputStream()));
     final StringBuilder errMsg = new StringBuilder();
     
     // read error and input streams as this would free up the buffers
@@ -177,23 +177,35 @@ public abstract class AbstractShell {
         }
       }
     };
+    Thread inThread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          parseExecResult(inReader);
+        } catch (IOException ioe) {
+          logger.warn("Error reading the in stream", ioe);
+        }
+        super.run();
+      }
+    };
     try {
       errThread.start();
+      inThread.start();
     } catch (IllegalStateException ise) { }
     try {
       // parse the output
-      parseExecResult(inReader);
-      exitCode  = process.waitFor();
+      exitCode = process.waitFor();
       try {
-        // make sure that the error thread exits
+        // make sure that the error and in thread exits
         errThread.join();
+        inThread.join();
       } catch (InterruptedException ie) {
-        logger.warn("Interrupted while reading the error stream", ie);
+        logger.warn("Interrupted while reading the error and in stream", ie);
       }
       completed.set(true);
       //the timeout thread handling
       //taken care in finally block
-      if (exitCode != 0) {
+      if (exitCode != 0 || errMsg.length() > 0) {
         throw new ExitCodeException(exitCode, errMsg.toString());
       }
     } catch (InterruptedException ie) {
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
index bae8f7f..de3d429 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
@@ -16,18 +16,32 @@
  */
 package org.apache.dolphinscheduler.common.utils;
 
+import static org.apache.dolphinscheduler.common.Constants.DATA_BASEDIR_PATH;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXS;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXS_DEFAULT_VALUE;
+import static org.apache.dolphinscheduler.common.Constants.YYYYMMDDHHMMSS;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.StringReader;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.UnsupportedCharsetException;
+import java.util.Optional;
+
 import org.apache.commons.io.Charsets;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.*;
-import java.nio.charset.Charset;
-import java.nio.charset.UnsupportedCharsetException;
-
-import static org.apache.dolphinscheduler.common.Constants.*;
-
 /**
  * file utils
  */
@@ -36,6 +50,8 @@ public class FileUtils {
 
     public static final String DATA_BASEDIR = PropertyUtils.getString(DATA_BASEDIR_PATH,"/tmp/dolphinscheduler");
 
+    public static final ThreadLocal<Logger> taskLoggerThreadLocal = new ThreadLocal<>();
+
     /**
      * get file suffix
      *
@@ -118,7 +134,7 @@ public class FileUtils {
         String fileName = String.format("%s/exec/process/%s/%s/%s", DATA_BASEDIR, Integer.toString(projectId),
                 Integer.toString(processDefineId), Integer.toString(processInstanceId));
         File file = new File(fileName);
-        if (!file.getParentFile().exists()){
+        if (!file.getParentFile().exists()) {
             file.getParentFile().mkdirs();
         }
 
@@ -138,24 +154,40 @@ public class FileUtils {
      * @param userName user name
      * @throws IOException errors
      */
-    public static void createWorkDirAndUserIfAbsent(String execLocalPath, String userName) throws IOException{
+    public static void createWorkDirAndUserIfAbsent(String execLocalPath, String userName) throws IOException {
         //if work dir exists, first delete
         File execLocalPathFile = new File(execLocalPath);
 
-        if (execLocalPathFile.exists()){
+        if (execLocalPathFile.exists()) {
             org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile);
         }
 
         //create work dir
         org.apache.commons.io.FileUtils.forceMkdir(execLocalPathFile);
-        logger.info("create dir success {}" , execLocalPath);
-
+        String mkdirLog = "create dir success " + execLocalPath;
+        LoggerUtils.logInfo(Optional.ofNullable(logger), mkdirLog);
+        LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), mkdirLog);
 
         //if not exists this user,then create
-        if (!OSUtils.getUserList().contains(userName)){
-            OSUtils.createUser(userName);
+        OSUtils.taskLoggerThreadLocal.set(taskLoggerThreadLocal.get());
+        try {
+            if (!OSUtils.getUserList().contains(userName)) {
+                boolean isSuccessCreateUser = OSUtils.createUser(userName);
+
+                String infoLog;
+                if (isSuccessCreateUser) {
+                    infoLog = String.format("create user name success %s", userName);
+                } else {
+                    infoLog = String.format("create user name fail %s", userName);
+                }
+                LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog);
+                LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog);
+            }
+        } catch (Throwable e) {
+            LoggerUtils.logError(Optional.ofNullable(logger), e);
+            LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), e);
         }
-        logger.info("create user name success {}", userName);
+        OSUtils.taskLoggerThreadLocal.remove();
     }
 
 
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
index 191df33..e3cf652 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
@@ -16,14 +16,15 @@
  */
 package org.apache.dolphinscheduler.common.utils;
 
-import org.apache.dolphinscheduler.common.Constants;
-import org.slf4j.Logger;
-
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.dolphinscheduler.common.Constants;
+import org.slf4j.Logger;
+
 /**
  *  logger utils
  */
@@ -93,4 +94,24 @@ public class LoggerUtils {
         }
         return appIds;
     }
+
+    public static void logError(Optional<Logger> optionalLogger
+            , String error) {
+        optionalLogger.ifPresent((Logger logger) -> logger.error(error));
+    }
+
+    public static void logError(Optional<Logger> optionalLogger
+            , Throwable e) {
+        optionalLogger.ifPresent((Logger logger) -> logger.error(e.getMessage(), e));
+    }
+
+    public static void logError(Optional<Logger> optionalLogger
+            , String error, Throwable e) {
+        optionalLogger.ifPresent((Logger logger) -> logger.error(error, e));
+    }
+
+    public static void logInfo(Optional<Logger> optionalLogger
+            , String info) {
+        optionalLogger.ifPresent((Logger logger) -> logger.info(info));
+    }
 }
\ No newline at end of file
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
index e3b2cc2..171a017 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
@@ -16,16 +16,6 @@
  */
 package org.apache.dolphinscheduler.common.utils;
 
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.shell.ShellExecutor;
-import org.apache.commons.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import oshi.SystemInfo;
-import oshi.hardware.CentralProcessor;
-import oshi.hardware.GlobalMemory;
-import oshi.hardware.HardwareAbstractionLayer;
-
 import java.io.BufferedReader;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -40,8 +30,21 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
+import java.util.StringTokenizer;
 import java.util.regex.Pattern;
 
+import org.apache.commons.configuration.Configuration;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.shell.ShellExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import oshi.SystemInfo;
+import oshi.hardware.CentralProcessor;
+import oshi.hardware.GlobalMemory;
+import oshi.hardware.HardwareAbstractionLayer;
+
 /**
  * os utils
  *
@@ -50,6 +53,8 @@ public class OSUtils {
 
   private static final Logger logger = LoggerFactory.getLogger(OSUtils.class);
 
+  public static final ThreadLocal<Logger> taskLoggerThreadLocal = new ThreadLocal<>();
+
   private static final SystemInfo SI = new SystemInfo();
   public static final String TWO_DECIMAL = "0.00";
 
@@ -251,7 +256,9 @@ public class OSUtils {
     try {
       String userGroup = OSUtils.getGroup();
       if (StringUtils.isEmpty(userGroup)) {
-        logger.error("{} group does not exist for this operating system.", userGroup);
+        String errorLog = String.format("%s group does not exist for this operating system.", userGroup);
+        LoggerUtils.logError(Optional.ofNullable(logger), errorLog);
+        LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), errorLog);
         return false;
       }
       if (isMacOS()) {
@@ -263,7 +270,8 @@ public class OSUtils {
       }
       return true;
     } catch (Exception e) {
-      logger.error(e.getMessage(), e);
+      LoggerUtils.logError(Optional.ofNullable(logger), e);
+      LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), e);
     }
 
     return false;
@@ -276,10 +284,14 @@ public class OSUtils {
    * @throws IOException in case of an I/O error
    */
   private static void createLinuxUser(String userName, String userGroup) throws IOException {
-    logger.info("create linux os user : {}", userName);
-    String cmd = String.format("sudo useradd -g %s %s", userGroup, userName);
+    String infoLog1 = String.format("create linux os user : %s", userName);
+    LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog1);
+    LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog1);
 
-    logger.info("execute cmd : {}", cmd);
+    String cmd = String.format("sudo useradd -g %s %s", userGroup, userName);
+    String infoLog2 = String.format("execute cmd : %s", cmd);
+    LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog2);
+    LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog2);
     OSUtils.exeCmd(cmd);
   }
 
@@ -290,13 +302,24 @@ public class OSUtils {
    * @throws IOException in case of an I/O error
    */
   private static void createMacUser(String userName, String userGroup) throws IOException {
-    logger.info("create mac os user : {}", userName);
-    String userCreateCmd = String.format("sudo sysadminctl -addUser %s -password %s", userName, userName);
-    String appendGroupCmd = String.format("sudo dseditgroup -o edit -a %s -t user %s", userName, userGroup);
 
-    logger.info("create user command : {}", userCreateCmd);
-    OSUtils.exeCmd(userCreateCmd);
-    logger.info("append user to group : {}", appendGroupCmd);
+    Optional<Logger> optionalLogger = Optional.ofNullable(logger);
+    Optional<Logger> optionalTaskLogger = Optional.ofNullable(taskLoggerThreadLocal.get());
+
+    String infoLog1 = String.format("create mac os user : %s", userName);
+    LoggerUtils.logInfo(optionalLogger, infoLog1);
+    LoggerUtils.logInfo(optionalTaskLogger, infoLog1);
+
+    String createUserCmd = String.format("sudo sysadminctl -addUser %s -password %s", userName, userName);
+    String infoLog2 = String.format("create user command : %s", createUserCmd);
+    LoggerUtils.logInfo(optionalLogger, infoLog2);
+    LoggerUtils.logInfo(optionalTaskLogger, infoLog2);
+    OSUtils.exeCmd(createUserCmd);
+
+    String appendGroupCmd = String.format("sudo dseditgroup -o edit -a %s -t user %s", userName, userGroup);
+    String infoLog3 = String.format("append user to group : %s", appendGroupCmd);
+    LoggerUtils.logInfo(optionalLogger, infoLog3);
+    LoggerUtils.logInfo(optionalTaskLogger, infoLog3);
     OSUtils.exeCmd(appendGroupCmd);
   }
 
@@ -307,14 +330,20 @@ public class OSUtils {
    * @throws IOException in case of an I/O error
    */
   private static void createWindowsUser(String userName, String userGroup) throws IOException {
-    logger.info("create windows os user : {}", userName);
-    String userCreateCmd = String.format("net user \"%s\" /add", userName);
-    String appendGroupCmd = String.format("net localgroup \"%s\" \"%s\" /add", userGroup, userName);
+    String infoLog1 = String.format("create windows os user : %s", userName);
+    LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog1);
+    LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog1);
 
-    logger.info("execute create user command : {}", userCreateCmd);
+    String userCreateCmd = String.format("net user \"%s\" /add", userName);
+    String infoLog2 = String.format("execute create user command : %s", userCreateCmd);
+    LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog2);
+    LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog2);
     OSUtils.exeCmd(userCreateCmd);
 
-    logger.info("execute append user to group : {}", appendGroupCmd);
+    String appendGroupCmd = String.format("net localgroup \"%s\" \"%s\" /add", userGroup, userName);
+    String infoLog3 = String.format("execute append user to group : %s", appendGroupCmd);
+    LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog3);
+    LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog3);
     OSUtils.exeCmd(appendGroupCmd);
   }
 
@@ -353,22 +382,12 @@ public class OSUtils {
    * @throws IOException errors
    */
   public static String exeCmd(String command) throws IOException {
-    BufferedReader br = null;
-
-    try {
-      Process p = Runtime.getRuntime().exec(command);
-      br = new BufferedReader(new InputStreamReader(p.getInputStream()));
-      String line;
-      StringBuilder sb = new StringBuilder();
-
-      while ((line = br.readLine()) != null) {
-        sb.append(line + "\n");
-      }
-
-      return sb.toString();
-    } finally {
-      IOUtils.closeQuietly(br);
+    StringTokenizer st = new StringTokenizer(command);
+    String[] cmdArray = new String[st.countTokens()];
+    for (int i = 0; st.hasMoreTokens(); i++) {
+      cmdArray[i] = st.nextToken();
     }
+    return exeShell(cmdArray);
   }
 
   /**
@@ -377,7 +396,7 @@ public class OSUtils {
    * @return result of execute the shell
    * @throws IOException errors
    */
-  public static String exeShell(String command) throws IOException {
+  public static String exeShell(String[] command) throws IOException {
     return ShellExecutor.execCommand(command);
   }
 
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java
index e1fa0c5..44c88f8 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java
@@ -68,7 +68,11 @@ public class OSUtilsTest {
     @Test
     public void createUser() {
         boolean result = OSUtils.createUser("test123");
-        Assert.assertTrue(result);
+        if (result) {
+            Assert.assertTrue("create user test123 success", true);
+        } else {
+            Assert.assertTrue("create user test123 fail", true);
+        }
     }
 
     @Test
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java
index 9543416..9c47fb9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java
@@ -16,11 +16,14 @@
  */
 package org.apache.dolphinscheduler.server.log;
 
+import static org.apache.dolphinscheduler.common.utils.LoggerUtils.TASK_APPID_LOG_FORMAT;
+
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+
 import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.filter.Filter;
 import ch.qos.logback.core.spi.FilterReply;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 
 /**
  *  task log filter
@@ -43,7 +46,9 @@ public class TaskLogFilter extends Filter<ILoggingEvent> {
      */
     @Override
     public FilterReply decide(ILoggingEvent event) {
-        if (event.getThreadName().startsWith(LoggerUtils.TASK_LOGGER_THREAD_NAME) || event.getLevel().isGreaterOrEqual(level)) {
+        if (event.getThreadName().startsWith(LoggerUtils.TASK_LOGGER_THREAD_NAME)
+                || event.getLoggerName().startsWith(" - " + TASK_APPID_LOG_FORMAT)
+                || event.getLevel().isGreaterOrEqual(level)) {
             return FilterReply.ACCEPT;
         }
         return FilterReply.DENY;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 0af84b1..4a2767f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -17,15 +17,22 @@
 
 package org.apache.dolphinscheduler.server.worker.processor;
 
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.sift.SiftingAppender;
-import com.github.rholder.retry.RetryException;
-import io.netty.channel.Channel;
+
+import java.util.Date;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.common.utils.Preconditions;
+import org.apache.dolphinscheduler.common.utils.RetryerUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@@ -40,9 +47,11 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Date;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
+import com.github.rholder.retry.RetryException;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.sift.SiftingAppender;
+import io.netty.channel.Channel;
 
 /**
  *  worker request processor
@@ -96,15 +105,26 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
 
         taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort());
 
+        // custom logger
+        Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
+                taskExecutionContext.getProcessDefineId(),
+                taskExecutionContext.getProcessInstanceId(),
+                taskExecutionContext.getTaskInstanceId()));
+
         // local execute path
         String execLocalPath = getExecLocalPath(taskExecutionContext);
         logger.info("task instance  local execute path : {} ", execLocalPath);
 
+        FileUtils.taskLoggerThreadLocal.set(taskLogger);
         try {
             FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode());
-        } catch (Exception ex){
-            logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
+        } catch (Throwable ex) {
+            String errorLog = String.format("create execLocalPath : %s", execLocalPath);
+            LoggerUtils.logError(Optional.ofNullable(logger), errorLog, ex);
+            LoggerUtils.logError(Optional.ofNullable(taskLogger), errorLog, ex);
         }
+        FileUtils.taskLoggerThreadLocal.remove();
+
         taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
                 new NettyRemoteChannel(channel, command.getOpaque()));
 
@@ -117,7 +137,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
                 return Boolean.TRUE;
             });
             // submit task
-            workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService));
+            workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger));
         } catch (ExecutionException | RetryException e) {
             logger.error(e.getMessage(), e);
         }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index b6ab894..a2ad762 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -16,7 +16,12 @@
  */
 package org.apache.dolphinscheduler.server.worker.runner;
 
-import org.apache.dolphinscheduler.common.utils.*;
+import java.io.File;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@@ -24,6 +29,10 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.utils.CommonUtils;
+import org.apache.dolphinscheduler.common.utils.HadoopUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
@@ -32,10 +41,6 @@ import org.apache.dolphinscheduler.server.worker.task.TaskManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.util.*;
-import java.util.stream.Collectors;
-
 
 /**
  *  task scheduler thread
@@ -63,13 +68,21 @@ public class TaskExecuteThread implements Runnable {
     private TaskCallbackService taskCallbackService;
 
     /**
+     * task logger
+     */
+    private Logger taskLogger;
+
+    /**
      *  constructor
      * @param taskExecutionContext taskExecutionContext
      * @param taskCallbackService taskCallbackService
      */
-    public TaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService){
+    public TaskExecuteThread(TaskExecutionContext taskExecutionContext
+            , TaskCallbackService taskCallbackService
+            , Logger taskLogger) {
         this.taskExecutionContext = taskExecutionContext;
         this.taskCallbackService = taskCallbackService;
+        this.taskLogger = taskLogger;
     }
 
     @Override
@@ -99,16 +112,7 @@ public class TaskExecuteThread implements Runnable {
                     taskExecutionContext.getProcessInstanceId(),
                     taskExecutionContext.getTaskInstanceId()));
 
-            // custom logger
-            Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
-                    taskExecutionContext.getProcessDefineId(),
-                    taskExecutionContext.getProcessInstanceId(),
-                    taskExecutionContext.getTaskInstanceId()));
-
-
-
-            task = TaskManager.newTask(taskExecutionContext,
-                    taskLogger);
+            task = TaskManager.newTask(taskExecutionContext, taskLogger);
 
             // task init
             task.init();