You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2023/07/19 06:50:10 UTC

[dolphinscheduler] branch dev updated: Support execute shell in different interceptor (#14582)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new c30cca9d9a Support execute shell in different interceptor (#14582)
c30cca9d9a is described below

commit c30cca9d9a0386002437829de9310b592c6d54bb
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Jul 19 14:50:04 2023 +0800

    Support execute shell in different interceptor (#14582)
---
 .../src/main/resources/common.properties           |   2 +
 .../plugin/task/api/AbstractCommandExecutor.java   | 153 ++++++------------
 .../plugin/task/api/AbstractYarnTask.java          |  36 ++---
 .../plugin/task/api/ShellCommandExecutor.java      | 107 -------------
 .../plugin/task/api/TaskExecutionContext.java      |   1 +
 .../shell/BaseLinuxShellInterceptorBuilder.java    | 171 +++++++++++++++++++++
 .../task/api/shell/BaseShellInterceptor.java       |  50 ++++++
 .../api/shell/BaseShellInterceptorBuilder.java     | 141 +++++++++++++++++
 .../shell/BaseWindowsShellInterceptorBuilder.java  | 116 ++++++++++++++
 .../plugin/task/api/shell/IShellInterceptor.java   |  30 ++++
 .../task/api/shell/IShellInterceptorBuilder.java   |  52 +++++++
 .../api/shell/ShellInterceptorBuilderFactory.java  |  43 ++++++
 .../task/api/shell/bash/BashShellInterceptor.java  |  30 ++++
 .../shell/bash/BashShellInterceptorBuilder.java    |  56 +++++++
 .../task/api/shell/cmd/CmdShellInterceptor.java    |  29 ++++
 .../api/shell/cmd/CmdShellInterceptorBuilder.java  |  55 +++++++
 .../task/api/shell/sh/ShShellInterceptor.java      |  29 ++++
 .../api/shell/sh/ShShellInterceptorBuilder.java    |  60 ++++++++
 .../plugin/task/chunjun/ChunJunTask.java           |  61 ++------
 .../plugin/task/dq/DataQualityTask.java            |  17 +-
 .../plugin/task/datax/DataxTask.java               |  60 ++------
 .../plugin/task/datax/DataxTaskTest.java           |  31 ++--
 .../dolphinscheduler/plugin/task/dvc/DvcTask.java  |   7 +-
 .../plugin/task/flink/FlinkStreamTask.java         |  15 +-
 .../plugin/task/flink/FlinkTask.java               |  15 +-
 .../plugin/task/hivecli/HiveCliTask.java           |   6 +-
 .../plugin/task/java/JavaTask.java                 |   6 +-
 .../plugin/task/jupyter/JupyterTask.java           |  19 ++-
 .../plugin/task/linkis/LinkisTask.java             |  35 ++---
 .../plugin/task/mlflow/MlflowTask.java             |  18 ++-
 .../plugin/task/mr/MapReduceTask.java              |  13 +-
 .../plugin/task/python/PythonTask.java             |  12 +-
 .../plugin/task/pytorch/PytorchTask.java           |  17 +-
 .../plugin/task/seatunnel/SeatunnelTask.java       |   7 +-
 .../plugin/task/shell/ShellTask.java               |  59 +------
 .../plugin/task/spark/SparkTask.java               |  20 +--
 .../plugin/task/spark/SparkTaskTest.java           |   4 +-
 .../plugin/task/sqoop/SqoopTask.java               |  14 +-
 38 files changed, 1081 insertions(+), 516 deletions(-)

diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties
index e7c119197e..3107929d56 100644
--- a/dolphinscheduler-common/src/main/resources/common.properties
+++ b/dolphinscheduler-common/src/main/resources/common.properties
@@ -153,6 +153,8 @@ appId.collect=log
 
 # The default env list will be load by Shell task, e.g. /etc/profile,~/.bash_profile
 shell.env_source_list=
+# The interceptor type of Shell task, e.g. bash, sh, cmd
+shell.interceptor.type=bash
 
 # Whether to enable remote logging
 remote.logging.enable=false
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index 0967c642fc..4c8edc6dac 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -25,22 +25,20 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD
 import org.apache.dolphinscheduler.common.constants.TenantConstants;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
-import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptor;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ShellUtils;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.SystemUtils;
 
 import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
 import java.io.InputStreamReader;
 import java.lang.reflect.Field;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -114,111 +112,61 @@ public abstract class AbstractCommandExecutor {
         }
     }
 
-    public AbstractCommandExecutor(LinkedBlockingQueue<String> logBuffer) {
-        this.logBuffer = logBuffer;
-    }
-
-    /**
-     * build process
-     *
-     * @param commandFile command file
-     * @throws IOException IO Exception
-     */
-    private void buildProcess(String commandFile) throws IOException {
-        // setting up user to run commands
-        List<String> command = new LinkedList<>();
-
-        // init process builder
-        ProcessBuilder processBuilder = new ProcessBuilder();
-        // setting up a working directory
-        processBuilder.directory(new File(taskRequest.getExecutePath()));
-        // merge error information to standard output stream
-        processBuilder.redirectErrorStream(true);
-
-        // if sudo.enable=true,setting up user to run commands
-        // todo: Create a ShellExecuteClass to generate the shell and execute shell commands
-        if (OSUtils.isSudoEnable() && !TenantConstants.DEFAULT_TENANT_CODE.equals(taskRequest.getTenantCode())) {
-            if (SystemUtils.IS_OS_LINUX
-                    && PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE)) {
-                generateCgroupCommand(command);
-            } else {
-                command.add("sudo");
-                command.add("-u");
-                command.add(taskRequest.getTenantCode());
-                command.add("-E");
-            }
-        }
-        command.add(commandInterpreter());
-        command.add(commandFile);
-
-        // setting commands
-        processBuilder.command(command);
-        process = processBuilder.start();
-
-        printCommand(command);
-    }
-
-    /**
-     * generate systemd command.
-     * eg: sudo systemd-run -q --scope -p CPUQuota=100% -p MemoryLimit=200M --uid=root
-     * @param command command
-     */
-    private void generateCgroupCommand(List<String> command) {
-        Integer cpuQuota = taskRequest.getCpuQuota();
-        Integer memoryMax = taskRequest.getMemoryMax();
-
-        command.add("sudo");
-        command.add("systemd-run");
-        command.add("-q");
-        command.add("--scope");
-
-        if (cpuQuota == -1) {
-            command.add("-p");
-            command.add("CPUQuota=");
-        } else {
-            command.add("-p");
-            command.add(String.format("CPUQuota=%s%%", taskRequest.getCpuQuota()));
-        }
-
-        // use `man systemd.resource-control` to find available parameter
-        if (memoryMax == -1) {
-            command.add("-p");
-            command.add(String.format("MemoryLimit=%s", "infinity"));
-        } else {
-            command.add("-p");
-            command.add(String.format("MemoryLimit=%sM", taskRequest.getMemoryMax()));
-        }
-
-        command.add(String.format("--uid=%s", taskRequest.getTenantCode()));
-    }
-
-    public TaskResponse run(String execCommand, TaskCallBack taskCallBack) throws Exception {
+    // todo: We need to build the IShellActuator in outer class, since different task may have specific logic to build
+    // the IShellActuator
+    public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
+                            TaskCallBack taskCallBack) throws Exception {
         TaskResponse result = new TaskResponse();
         int taskInstanceId = taskRequest.getTaskInstanceId();
         if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
+            logger.warn(
+                    "Cannot find the taskInstance: {} from TaskExecutionContextCacheManager, the task might already been killed",
+                    taskInstanceId);
             result.setExitStatusCode(EXIT_CODE_KILL);
             return result;
         }
-        if (StringUtils.isEmpty(execCommand)) {
-            TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
-            return result;
+        iShellInterceptorBuilder = iShellInterceptorBuilder
+                .shellDirectory(taskRequest.getExecutePath())
+                .shellName(taskRequest.getTaskAppId());
+        // Set system env
+        if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
+            ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);
+        }
+        // Set custom env
+        if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
+            iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig());
+        }
+        // Set k8s config (This is only work in Linux)
+        if (taskRequest.getK8sTaskExecutionContext() != null) {
+            iShellInterceptorBuilder.k8sConfigYaml(taskRequest.getK8sTaskExecutionContext().getConfigYaml());
+        }
+        // Set sudo (This is only work in Linux)
+        iShellInterceptorBuilder.sudoMode(OSUtils.isSudoEnable());
+        // Set tenant (This is only work in Linux)
+        if (TenantConstants.DEFAULT_TENANT_CODE.equals(taskRequest.getTenantCode())) {
+            iShellInterceptorBuilder.runUser(TenantConstants.BOOTSTRAPT_SYSTEM_USER);
+        } else {
+            iShellInterceptorBuilder.runUser(taskRequest.getTenantCode());
+        }
+        // Set CPU Quota (This is only work in Linux)
+        if (taskRequest.getCpuQuota() != null) {
+            iShellInterceptorBuilder.cpuQuota(taskRequest.getCpuQuota());
+        }
+        // Set memory Quota (This is only work in Linux)
+        if (taskRequest.getMemoryMax() != null) {
+            iShellInterceptorBuilder.memoryQuota(taskRequest.getMemoryMax());
         }
 
-        String commandFilePath = buildCommandFilePath();
-
-        // create command file if not exists
-        createCommandFileIfNotExists(execCommand, commandFilePath);
-
-        // build process
-        buildProcess(commandFilePath);
+        IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build();
+        process = iShellInterceptor.execute();
 
         // parse process output
-        parseProcessOutput(process);
+        parseProcessOutput(this.process);
 
         // collect pod log
         collectPodLogIfNeeded();
 
-        int processId = getProcessId(process);
+        int processId = getProcessId(this.process);
 
         result.setProcessId(processId);
 
@@ -243,7 +191,7 @@ public abstract class AbstractCommandExecutor {
         }
 
         // waiting for the run to finish
-        boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
+        boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);
 
         TaskExecutionStatus kubernetesStatus =
                 ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());
@@ -272,7 +220,7 @@ public abstract class AbstractCommandExecutor {
         if (status && kubernetesStatus.isSuccess()) {
 
             // SHELL task state
-            result.setExitStatusCode(process.exitValue());
+            result.setExitStatusCode(this.process.exitValue());
 
         } else {
             logger.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
@@ -280,7 +228,7 @@ public abstract class AbstractCommandExecutor {
             result.setExitStatusCode(EXIT_CODE_FAILURE);
             cancelApplication();
         }
-        int exitCode = process.exitValue();
+        int exitCode = this.process.exitValue();
         String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
         logger.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
                 exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
@@ -446,9 +394,4 @@ public abstract class AbstractCommandExecutor {
         return processId;
     }
 
-    protected abstract String buildCommandFilePath();
-
-    protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
-
-    protected abstract String commandInterpreter();
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
index 406e78a80b..69f6aceb99 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
@@ -22,31 +22,17 @@ import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COL
 
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
 import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
 
 import java.util.List;
-import java.util.regex.Pattern;
+import java.util.Map;
 
-/**
- * abstract yarn task
- */
 public abstract class AbstractYarnTask extends AbstractRemoteTask {
 
-    /**
-     * process task
-     */
     private ShellCommandExecutor shellCommandExecutor;
 
-    /**
-     * rules for extracting application ID
-     */
-    protected static final Pattern YARN_APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX);
-
-    /**
-     * Abstract Yarn Task
-     *
-     * @param taskRequest taskRequest
-     */
     public AbstractYarnTask(TaskExecutionContext taskRequest) {
         super(taskRequest);
         this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
@@ -58,8 +44,12 @@ public abstract class AbstractYarnTask extends AbstractRemoteTask {
     @Override
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
+            IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+                    .properties(getProperties())
+                    // todo: do we need to move the replace to subclass?
+                    .appendScript(getScript().replaceAll("\\r\\n", System.lineSeparator()));
             // SHELL task exit code
-            TaskResponse response = shellCommandExecutor.run(buildCommand(), taskCallBack);
+            TaskResponse response = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
             setExitStatusCode(response.getExitStatusCode());
             // set appIds
             setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
@@ -115,10 +105,12 @@ public abstract class AbstractYarnTask extends AbstractRemoteTask {
     }
 
     /**
-     * create command
-     *
-     * @return String
+     * Get the script used to bootstrap the task
      */
-    protected abstract String buildCommand();
+    protected abstract String getScript();
 
+    /**
+     * Get the properties of the task used to replace the placeholders in the script.
+     */
+    protected abstract Map<String, String> getProperties();
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
index 9bcc765bb3..2dbea62287 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
@@ -17,19 +17,6 @@
 
 package org.apache.dolphinscheduler.plugin.task.api;
 
-import org.apache.dolphinscheduler.common.utils.FileUtils;
-import org.apache.dolphinscheduler.plugin.task.api.utils.ShellUtils;
-
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.SystemUtils;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.Objects;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Consumer;
 
@@ -40,104 +27,10 @@ import org.slf4j.Logger;
  */
 public class ShellCommandExecutor extends AbstractCommandExecutor {
 
-    /**
-     * For Unix-like, using bash
-     */
-    private static final String SH = "bash";
-
-    /**
-     * For Windows, using cmd.exe
-     */
-    private static final String CMD = "cmd.exe";
-
-    /**
-     * constructor
-     *
-     * @param logHandler logHandler
-     * @param taskRequest taskRequest
-     * @param logger logger
-     */
     public ShellCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
                                 TaskExecutionContext taskRequest,
                                 Logger logger) {
         super(logHandler, taskRequest, logger);
     }
 
-    public ShellCommandExecutor(LinkedBlockingQueue<String> logBuffer) {
-        super(logBuffer);
-    }
-
-    @Override
-    protected String buildCommandFilePath() {
-        // command file
-        return String.format("%s/%s.%s", taskRequest.getExecutePath(), taskRequest.getTaskAppId(),
-                SystemUtils.IS_OS_WINDOWS ? "bat" : "command");
-    }
-
-    /**
-     * create command file if not exists
-     *
-     * @param execCommand exec command
-     * @param commandFile command file
-     * @throws IOException io exception
-     */
-    @Override
-    protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
-        // create if non existence
-        logger.info("Begin to create command file:{}", commandFile);
-
-        Path commandFilePath = Paths.get(commandFile);
-        if (Files.exists(commandFilePath)) {
-            logger.warn("The command file: {} is already exist, will not create a again", commandFile);
-            return;
-        }
-
-        StringBuilder sb = new StringBuilder();
-        if (SystemUtils.IS_OS_WINDOWS) {
-            sb.append("@echo off").append(System.lineSeparator());
-            sb.append("cd /d %~dp0").append(System.lineSeparator());
-            if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
-                for (String envSourceFile : ShellUtils.ENV_SOURCE_LIST) {
-                    sb.append("call ").append(envSourceFile).append("\n");
-                }
-            }
-            if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
-                sb.append(taskRequest.getEnvironmentConfig()).append(System.lineSeparator());
-            }
-        } else {
-            sb.append("#!/bin/bash").append(System.lineSeparator());
-            sb.append("BASEDIR=$(cd `dirname $0`; pwd)").append(System.lineSeparator());
-            sb.append("cd $BASEDIR").append(System.lineSeparator());
-            if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
-                for (String envSourceFile : ShellUtils.ENV_SOURCE_LIST) {
-                    sb.append("source ").append(envSourceFile).append("\n");
-                }
-            }
-            if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
-                sb.append(taskRequest.getEnvironmentConfig()).append(System.lineSeparator());
-            }
-            if (Objects.nonNull(taskRequest.getK8sTaskExecutionContext())) {
-                String configYaml = taskRequest.getK8sTaskExecutionContext().getConfigYaml();
-                Path kubeConfigPath = Paths.get(org.apache.dolphinscheduler.common.utils.FileUtils
-                        .getKubeConfigPath(taskRequest.getExecutePath()));
-                FileUtils.createFileWith755(kubeConfigPath);
-                Files.write(kubeConfigPath, configYaml.getBytes(), StandardOpenOption.APPEND);
-                sb.append("export KUBECONFIG=" + kubeConfigPath).append(System.lineSeparator());
-                logger.info("Create kubernetes configuration file: {}.", kubeConfigPath);
-            }
-        }
-        sb.append(execCommand);
-        String commandContent = sb.toString();
-
-        FileUtils.createFileWith755(commandFilePath);
-        Files.write(commandFilePath, commandContent.getBytes(), StandardOpenOption.APPEND);
-
-        logger.info("Success create command file, command: {}", commandContent);
-    }
-
-    @Override
-    protected String commandInterpreter() {
-        return SystemUtils.IS_OS_WINDOWS ? CMD : SH;
-    }
-
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
index b8d125e5a9..de9a727567 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
@@ -173,6 +173,7 @@ public class TaskExecutionContext implements Serializable {
 
     /**
      * definedParams
+     * // todo: we need to rename definedParams, prepareParamsMap, paramsMap, this is confusing
      */
     private Map<String, String> definedParams;
 
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java
new file mode 100644
index 0000000000..c0f03fdccd
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell;
+
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public abstract class BaseLinuxShellInterceptorBuilder<T extends BaseLinuxShellInterceptorBuilder<T, Y>, Y extends BaseShellInterceptor>
+        extends
+            BaseShellInterceptorBuilder<T, Y> {
+
+    protected void generateShellScript() throws IOException {
+        List<String> finalScripts = new ArrayList<>();
+        // add shell header
+        finalScripts.add(shellHeader());
+        finalScripts.add("BASEDIR=$(cd `dirname $0`; pwd)");
+        finalScripts.add("cd $BASEDIR");
+        // add system env
+        finalScripts.addAll(systemEnvScript());
+        // add custom env
+        finalScripts.addAll(customEnvScript());
+        // add k8s config
+        finalScripts.addAll(k8sConfig());
+        // add shell body
+        finalScripts.add(shellBody());
+        // create shell file
+        String finalScript = finalScripts.stream().collect(Collectors.joining(System.lineSeparator()));
+        Path shellAbsolutePath = shellAbsolutePath();
+        FileUtils.createFileWith755(shellAbsolutePath);
+        Files.write(shellAbsolutePath, finalScript.getBytes(), StandardOpenOption.APPEND);
+        log.info("Final Shell file is : \n{}", finalScript);
+    }
+
+    protected List<String> generateBootstrapCommand() {
+        if (sudoEnable) {
+            return bootstrapCommandInSudoMode();
+        }
+        return bootstrapCommandInNormalMode();
+    }
+
+    protected abstract String shellHeader();
+
+    protected abstract String shellInterpreter();
+
+    protected abstract String shellExtension();
+
+    private List<String> systemEnvScript() {
+        if (CollectionUtils.isEmpty(systemEnvs)) {
+            return Collections.emptyList();
+        }
+        return systemEnvs
+                .stream()
+                .map(systemEnv -> "source " + systemEnv).collect(Collectors.toList());
+    }
+
+    private List<String> customEnvScript() {
+        if (CollectionUtils.isEmpty(customEnvScripts)) {
+            return Collections.emptyList();
+        }
+        return customEnvScripts;
+    }
+
+    private List<String> k8sConfig() throws IOException {
+        if (StringUtils.isEmpty(k8sConfigYaml)) {
+            return Collections.emptyList();
+        }
+        Path kubeConfigPath = Paths.get(FileUtils.getKubeConfigPath(shellDirectory));
+        FileUtils.createFileWith755(kubeConfigPath);
+        Files.write(kubeConfigPath, k8sConfigYaml.getBytes(), StandardOpenOption.APPEND);
+        log.info("Created kubernetes configuration file: {}.", kubeConfigPath);
+        return Collections.singletonList("export KUBECONFIG=" + kubeConfigPath);
+    }
+
+    private String shellBody() {
+        if (CollectionUtils.isEmpty(scripts)) {
+            return StringUtils.EMPTY;
+        }
+        String scriptBody = scripts
+                .stream()
+                .collect(Collectors.joining(System.lineSeparator()));
+        scriptBody = scriptBody.replaceAll("\\r\\n", System.lineSeparator());
+        return ParameterUtils.convertParameterPlaceholders(scriptBody, propertyMap);
+    }
+
+    private Path shellAbsolutePath() {
+        return Paths.get(shellDirectory, shellName + shellExtension());
+    }
+
+    private List<String> bootstrapCommandInSudoMode() {
+        if (PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE)) {
+            return bootstrapCommandInResourceLimitMode();
+        }
+        List<String> bootstrapCommand = new ArrayList<>();
+        bootstrapCommand.add("sudo");
+        if (StringUtils.isNotBlank(runUser)) {
+            bootstrapCommand.add("-u");
+            bootstrapCommand.add(runUser);
+        }
+        bootstrapCommand.add("-E");
+        bootstrapCommand.add(shellAbsolutePath().toString());
+        return bootstrapCommand;
+    }
+
+    private List<String> bootstrapCommandInNormalMode() {
+        List<String> bootstrapCommand = new ArrayList<>();
+        bootstrapCommand.add(shellInterpreter());
+        bootstrapCommand.add(shellAbsolutePath().toString());
+        return bootstrapCommand;
+    }
+
+    private List<String> bootstrapCommandInResourceLimitMode() {
+        List<String> bootstrapCommand = new ArrayList<>();
+        bootstrapCommand.add("sudo");
+        bootstrapCommand.add("systemd-run");
+        bootstrapCommand.add("-q");
+        bootstrapCommand.add("--scope");
+
+        if (cpuQuota == -1) {
+            bootstrapCommand.add("-p");
+            bootstrapCommand.add("CPUQuota=");
+        } else {
+            bootstrapCommand.add("-p");
+            bootstrapCommand.add(String.format("CPUQuota=%s%%", cpuQuota));
+        }
+
+        // use `man systemd.resource-control` to find available parameter
+        if (memoryQuota == -1) {
+            bootstrapCommand.add("-p");
+            bootstrapCommand.add(String.format("MemoryLimit=%s", "infinity"));
+        } else {
+            bootstrapCommand.add("-p");
+            bootstrapCommand.add(String.format("MemoryLimit=%sM", memoryQuota));
+        }
+
+        bootstrapCommand.add(String.format("--uid=%s", runUser));
+        return bootstrapCommand;
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptor.java
new file mode 100644
index 0000000000..0f82956204
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public abstract class BaseShellInterceptor implements IShellInterceptor {
+
+    protected final String workingDirectory;
+    protected final List<String> executeCommands;
+
+    protected BaseShellInterceptor(List<String> executeCommands, String workingDirectory) {
+        this.executeCommands = executeCommands;
+        this.workingDirectory = workingDirectory;
+    }
+
+    @Override
+    public Process execute() throws IOException {
+        // init process builder
+        ProcessBuilder processBuilder = new ProcessBuilder();
+        // setting up a working directory
+        processBuilder.directory(new File(workingDirectory));
+        // merge error information to standard output stream
+        processBuilder.redirectErrorStream(true);
+        processBuilder.command(executeCommands);
+        log.info("Executing shell command : {}", String.join(" ", executeCommands));
+        return processBuilder.start();
+    }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptorBuilder.java
new file mode 100644
index 0000000000..329e79661c
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptorBuilder.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell;
+
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class BaseShellInterceptorBuilder<T extends BaseShellInterceptorBuilder<T, Y>, Y extends BaseShellInterceptor>
+        implements
+            IShellInterceptorBuilder<T, Y> {
+
+    protected String shellDirectory;
+
+    protected String shellName;
+
+    protected String runUser;
+
+    protected Integer cpuQuota;
+
+    protected Integer memoryQuota;
+
+    protected List<String> systemEnvs = new ArrayList<>();
+
+    protected List<String> customEnvScripts = new ArrayList<>();
+
+    protected String k8sConfigYaml;
+
+    protected Map<String, String> propertyMap = new HashMap<>();
+
+    protected boolean sudoEnable;
+
+    protected List<String> scripts = new ArrayList<>();
+
+    protected BaseShellInterceptorBuilder() {
+    }
+
+    @Override
+    public T newBuilder(T builder) {
+        T newBuilder = newBuilder();
+        newBuilder.shellDirectory = builder.shellDirectory;
+        newBuilder.shellName = builder.shellName;
+        newBuilder.runUser = builder.runUser;
+        newBuilder.cpuQuota = builder.cpuQuota;
+        newBuilder.memoryQuota = builder.memoryQuota;
+        newBuilder.systemEnvs = builder.systemEnvs;
+        newBuilder.customEnvScripts = builder.customEnvScripts;
+        newBuilder.k8sConfigYaml = builder.k8sConfigYaml;
+        newBuilder.propertyMap = builder.propertyMap;
+        newBuilder.sudoEnable = builder.sudoEnable;
+        newBuilder.scripts = builder.scripts;
+        return newBuilder;
+    }
+
+    @Override
+    public T shellDirectory(String shellDirectory) {
+        this.shellDirectory = shellDirectory;
+        return (T) this;
+    }
+
+    @Override
+    public T shellName(String shellFilename) {
+        this.shellName = shellFilename;
+        return (T) this;
+    }
+
+    @Override
+    public T runUser(String systemUser) {
+        this.runUser = systemUser;
+        return (T) this;
+    }
+
+    @Override
+    public T cpuQuota(Integer cpuQuota) {
+        this.cpuQuota = cpuQuota;
+        return (T) this;
+    }
+
+    @Override
+    public T memoryQuota(Integer memoryQuota) {
+        this.memoryQuota = memoryQuota;
+        return (T) this;
+    }
+
+    @Override
+    public T appendSystemEnv(String envFiles) {
+        systemEnvs.add(envFiles);
+        return (T) this;
+    }
+
+    @Override
+    public T appendCustomEnvScript(String customEnvScript) {
+        customEnvScripts.add(customEnvScript);
+        return (T) this;
+    }
+
+    @Override
+    public T k8sConfigYaml(String k8sConfigYaml) {
+        this.k8sConfigYaml = k8sConfigYaml;
+        return (T) this;
+    }
+
+    @Override
+    public T properties(Map<String, String> propertyMap) {
+        if (MapUtils.isNotEmpty(propertyMap)) {
+            this.propertyMap.putAll(propertyMap);
+        }
+        return (T) this;
+    }
+
+    @Override
+    public T sudoMode(boolean sudoEnable) {
+        this.sudoEnable = sudoEnable;
+        return (T) this;
+    }
+
+    @Override
+    public T appendScript(String script) {
+        scripts.add(script);
+        return (T) this;
+    }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseWindowsShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseWindowsShellInterceptorBuilder.java
new file mode 100644
index 0000000000..9d31bf34c3
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseWindowsShellInterceptorBuilder.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell;
+
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public abstract class BaseWindowsShellInterceptorBuilder<T extends BaseWindowsShellInterceptorBuilder<T, Y>, Y extends BaseShellInterceptor>
+        extends
+            BaseShellInterceptorBuilder<T, Y> {
+
+    protected void generateShellScript() throws IOException {
+        List<String> finalScripts = new ArrayList<>();
+        // add shell header
+        finalScripts.add(shellHeader());
+        finalScripts.add("cd /d %~dp0");
+        // add system env
+        finalScripts.addAll(systemEnvScript());
+        // add custom env
+        finalScripts.addAll(customEnvScript());
+        // add k8s config
+        finalScripts.addAll(k8sConfig());
+        // add shell body
+        finalScripts.add(shellBody());
+        // create shell file
+        String finalScript = finalScripts.stream().collect(Collectors.joining(System.lineSeparator()));
+        Path shellAbsolutePath = shellAbsolutePath();
+        FileUtils.createFileWith755(shellAbsolutePath);
+        Files.write(shellAbsolutePath, finalScript.getBytes(), StandardOpenOption.APPEND);
+        log.info("Final Shell file is : \n{}", finalScript);
+    }
+
+    private String shellBody() {
+        if (CollectionUtils.isEmpty(scripts)) {
+            return StringUtils.EMPTY;
+        }
+        String scriptBody = scripts
+                .stream()
+                .collect(Collectors.joining(System.lineSeparator()));
+        return ParameterUtils.convertParameterPlaceholders(scriptBody, propertyMap);
+    }
+
+    private Collection<String> k8sConfig() {
+        log.warn("k8s config is not supported in windows");
+        return Collections.emptyList();
+    }
+
+    protected List<String> generateBootstrapCommand() {
+        if (sudoEnable) {
+            log.warn("sudo is not supported in windows");
+        }
+        // todo: support tenant in widnows
+        List<String> bootstrapCommand = new ArrayList<>();
+        bootstrapCommand.add(shellInterpreter());
+        bootstrapCommand.add(shellAbsolutePath().toString());
+        return bootstrapCommand;
+    }
+
+    protected abstract String shellHeader();
+
+    protected abstract String shellInterpreter();
+
+    protected abstract String shellExtension();
+
+    private List<String> systemEnvScript() {
+        if (CollectionUtils.isEmpty(systemEnvs)) {
+            return Collections.emptyList();
+        }
+        return systemEnvs.stream()
+                .map(systemEnv -> "call " + systemEnv)
+                .collect(Collectors.toList());
+    }
+
+    private List<String> customEnvScript() {
+        if (CollectionUtils.isEmpty(customEnvScripts)) {
+            return Collections.emptyList();
+        }
+        return customEnvScripts;
+    }
+
+    private Path shellAbsolutePath() {
+        return Paths.get(shellDirectory, shellName + shellExtension());
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptor.java
new file mode 100644
index 0000000000..45b9bf96f7
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell;
+
+import java.io.IOException;
+
+/**
+ * This interface is used to execute shell commands.
+ * It should be created by @{@link IShellInterceptorBuilder}.
+ */
+public interface IShellInterceptor {
+
+    Process execute() throws IOException;
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptorBuilder.java
new file mode 100644
index 0000000000..cab50e3bdf
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptorBuilder.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface IShellInterceptorBuilder<T extends IShellInterceptorBuilder<T, Y>, Y extends IShellInterceptor> {
+
+    T newBuilder();
+
+    T newBuilder(T builder);
+
+    T shellDirectory(String directory);
+
+    T shellName(String shellFilename);
+
+    T runUser(String systemUser);
+
+    T cpuQuota(Integer cpuQuota);
+
+    T memoryQuota(Integer memoryQuota);
+
+    T appendSystemEnv(String envFiles);
+
+    T appendCustomEnvScript(String customEnvScript);
+
+    T k8sConfigYaml(String k8sConfigYaml);
+
+    T properties(Map<String, String> propertyMap);
+
+    T sudoMode(boolean sudoEnable);
+
+    T appendScript(String script);
+
+    Y build() throws IOException;
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/ShellInterceptorBuilderFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/ShellInterceptorBuilderFactory.java
new file mode 100644
index 0000000000..9654d091cd
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/ShellInterceptorBuilderFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell;
+
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.cmd.CmdShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.sh.ShShellInterceptorBuilder;
+
+public class ShellInterceptorBuilderFactory {
+
+    private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type", "bash");
+
+    @SuppressWarnings("unchecked")
+    public static IShellInterceptorBuilder newBuilder() {
+        if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) {
+            return new BashShellInterceptorBuilder();
+        }
+        if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) {
+            return new ShShellInterceptorBuilder();
+        }
+        if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) {
+            return new CmdShellInterceptorBuilder();
+        }
+        throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE);
+    }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptor.java
new file mode 100644
index 0000000000..f0a237c742
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell.bash;
+
+import org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor;
+
+import java.util.List;
+
+public class BashShellInterceptor extends BaseShellInterceptor {
+
+    public BashShellInterceptor(List<String> executeCommands, String workingDirectory) {
+        super(executeCommands, workingDirectory);
+    }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptorBuilder.java
new file mode 100644
index 0000000000..15ffd2cc88
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptorBuilder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell.bash;
+
+import org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder;
+
+import java.io.IOException;
+import java.util.List;
+
+public class BashShellInterceptorBuilder
+        extends
+            BaseLinuxShellInterceptorBuilder<BashShellInterceptorBuilder, BashShellInterceptor> {
+
+    @Override
+    public BashShellInterceptorBuilder newBuilder() {
+        return new BashShellInterceptorBuilder();
+    }
+
+    @Override
+    public BashShellInterceptor build() throws IOException {
+        generateShellScript();
+        List<String> bootstrapCommand = generateBootstrapCommand();
+        return new BashShellInterceptor(bootstrapCommand, shellDirectory);
+    }
+
+    @Override
+    protected String shellInterpreter() {
+        return "bash";
+    }
+
+    @Override
+    protected String shellExtension() {
+        return ".sh";
+    }
+
+    @Override
+    protected String shellHeader() {
+        return "#!/bin/bash";
+    }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptor.java
new file mode 100644
index 0000000000..cb4d8b05d6
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell.cmd;
+
+import org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor;
+
+import java.util.List;
+
+public class CmdShellInterceptor extends BaseShellInterceptor {
+
+    protected CmdShellInterceptor(List<String> executeCommands, String workingDirectory) {
+        super(executeCommands, workingDirectory);
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptorBuilder.java
new file mode 100644
index 0000000000..61f7725fe0
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptorBuilder.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell.cmd;
+
+import org.apache.dolphinscheduler.plugin.task.api.shell.BaseWindowsShellInterceptorBuilder;
+
+import java.io.IOException;
+import java.util.List;
+
+public class CmdShellInterceptorBuilder
+        extends
+            BaseWindowsShellInterceptorBuilder<CmdShellInterceptorBuilder, CmdShellInterceptor> {
+
+    @Override
+    protected String shellHeader() {
+        return "@echo off";
+    }
+
+    @Override
+    protected String shellInterpreter() {
+        return "cmd.exe";
+    }
+
+    @Override
+    protected String shellExtension() {
+        return ".bat";
+    }
+
+    @Override
+    public CmdShellInterceptorBuilder newBuilder() {
+        return new CmdShellInterceptorBuilder();
+    }
+
+    @Override
+    public CmdShellInterceptor build() throws IOException {
+        generateShellScript();
+        List<String> bootstrapCommand = generateBootstrapCommand();
+        return new CmdShellInterceptor(bootstrapCommand, shellDirectory);
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptor.java
new file mode 100644
index 0000000000..fae2a8462c
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell.sh;
+
+import org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor;
+
+import java.util.List;
+
+public class ShShellInterceptor extends BaseShellInterceptor {
+
+    protected ShShellInterceptor(List<String> executeCommands, String shellDirectory) {
+        super(executeCommands, shellDirectory);
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptorBuilder.java
new file mode 100644
index 0000000000..cef1e5843a
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptorBuilder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell.sh;
+
+import org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder;
+
+import java.io.IOException;
+import java.util.List;
+
+public class ShShellInterceptorBuilder
+        extends
+            BaseLinuxShellInterceptorBuilder<ShShellInterceptorBuilder, ShShellInterceptor> {
+
+    @Override
+    public ShShellInterceptorBuilder newBuilder() {
+        return new ShShellInterceptorBuilder();
+    }
+
+    @Override
+    public ShShellInterceptorBuilder k8sConfigYaml(String k8sConfigYaml) {
+        return null;
+    }
+
+    @Override
+    public ShShellInterceptor build() throws IOException {
+        generateShellScript();
+        List<String> bootstrapCommand = generateBootstrapCommand();
+        return new ShShellInterceptor(bootstrapCommand, shellDirectory);
+    }
+
+    @Override
+    protected String shellHeader() {
+        return "#!/bin/sh";
+    }
+
+    @Override
+    protected String shellInterpreter() {
+        return "sh";
+    }
+
+    @Override
+    protected String shellExtension() {
+        return ".sh";
+    }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
index 92bd3ea69e..2e4731d704 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
@@ -18,7 +18,6 @@
 package org.apache.dolphinscheduler.plugin.task.chunjun;
 
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
@@ -30,25 +29,21 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 import org.apache.dolphinscheduler.spi.enums.Flag;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.SystemUtils;
 
 import java.io.File;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.nio.file.attribute.FileAttribute;
-import java.nio.file.attribute.PosixFilePermission;
-import java.nio.file.attribute.PosixFilePermissions;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * chunjun task
@@ -102,19 +97,16 @@ public class ChunJunTask extends AbstractTask {
         }
     }
 
-    /**
-     * run chunjun process
-     *
-     * @throws TaskException exception
-     */
+    @SuppressWarnings("unchecked")
     @Override
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
             Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
 
-            String jsonFilePath = buildChunJunJsonFile(paramsMap);
-            String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap);
-            TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath, taskCallBack);
+            IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+                    .properties(ParameterUtils.convert(paramsMap))
+                    .appendScript(buildCommand(buildChunJunJsonFile(paramsMap)));
+            TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
 
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
 
@@ -167,25 +159,7 @@ public class ChunJunTask extends AbstractTask {
         return fileName;
     }
 
-    /**
-     * create command
-     *
-     * @return shell command file name
-     * @throws Exception if error throws Exception
-     */
-    private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap) throws Exception {
-        // generate scripts
-        String fileName = String.format("%s/%s_node.%s",
-                taskExecutionContext.getExecutePath(),
-                taskExecutionContext.getTaskAppId(),
-                SystemUtils.IS_OS_WINDOWS ? "bat" : "sh");
-
-        Path path = new File(fileName).toPath();
-
-        if (Files.exists(path)) {
-            return fileName;
-        }
-
+    private String buildCommand(String jobConfigFilePath) {
         // chunjun command
         List<String> args = new ArrayList<>();
 
@@ -215,24 +189,7 @@ public class ChunJunTask extends AbstractTask {
 
         String command = String.join(" ", args);
 
-        // replace placeholder
-        String chunjunCommand = ParameterUtils.convertParameterPlaceholders(command, ParameterUtils.convert(paramsMap));
-
-        log.info("raw script : {}", chunjunCommand);
-
-        // create shell command file
-        Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
-        FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
-
-        if (SystemUtils.IS_OS_WINDOWS) {
-            Files.createFile(path);
-        } else {
-            Files.createFile(path, attr);
-        }
-
-        Files.write(path, chunjunCommand.getBytes(), StandardOpenOption.APPEND);
-
-        return fileName;
+        return command;
     }
 
     public String getExecMode(ChunJunParameters chunJunParameters) {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
index 5fe451d916..381b8cbd37 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
@@ -38,7 +38,6 @@ import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
 import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.dataquality.DataQualityParameters;
@@ -57,6 +56,7 @@ import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * In DataQualityTask, the input parameters will be converted into DataQualityConfiguration,
@@ -160,19 +160,16 @@ public class DataQualityTask extends AbstractYarnTask {
     }
 
     @Override
-    protected String buildCommand() {
+    protected String getScript() {
         List<String> args = new ArrayList<>();
-
         args.add(SPARK_COMMAND);
         args.addAll(SparkArgsUtils.buildArgs(dataQualityParameters.getSparkParameters()));
+        return args.stream().collect(Collectors.joining(" "));
+    }
 
-        // replace placeholder
-        Map<String, Property> paramsMap = dqTaskExecutionContext.getPrepareParamsMap();
-        String command =
-                ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParameterUtils.convert(paramsMap));
-        log.info("data quality task command: {}", command);
-
-        return command;
+    @Override
+    protected Map<String, String> getProperties() {
+        return ParameterUtils.convert(dqTaskExecutionContext.getPrepareParamsMap());
     }
 
     protected void setMainJarName() {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
index 4477b42eb7..80de6cc7cd 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.datax;
 
 import static org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword;
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
 
 import org.apache.dolphinscheduler.common.log.SensitiveDataConverter;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -33,6 +32,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -40,16 +41,11 @@ import org.apache.dolphinscheduler.spi.enums.Flag;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.SystemUtils;
 
 import java.io.File;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.nio.file.attribute.FileAttribute;
-import java.nio.file.attribute.PosixFilePermission;
-import java.nio.file.attribute.PosixFilePermissions;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -59,7 +55,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 import com.alibaba.druid.sql.ast.SQLStatement;
@@ -152,21 +147,18 @@ public class DataxTask extends AbstractTask {
                 dataXParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
     }
 
-    /**
-     * run DataX process
-     *
-     * @throws TaskException if error throws Exception
-     */
+    @SuppressWarnings("unchecked")
     @Override
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
             // replace placeholder,and combine local and global parameters
             Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
 
-            // run datax processDataSourceService
-            String jsonFilePath = buildDataxJsonFile(paramsMap);
-            String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap);
-            TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath, taskCallBack);
+            IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+                    .properties(ParameterUtils.convert(paramsMap))
+                    .appendScript(buildCommand(buildDataxJsonFile(paramsMap), paramsMap));
+
+            TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
 
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setProcessId(commandExecuteResult.getProcessId());
@@ -385,23 +377,10 @@ public class DataxTask extends AbstractTask {
      * create command
      *
      * @return shell command file name
-     * @throws Exception if error throws Exception
      */
-    private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap) throws Exception {
-        // generate scripts
-        String fileName = String.format("%s/%s_node.%s",
-                taskExecutionContext.getExecutePath(),
-                taskExecutionContext.getTaskAppId(),
-                SystemUtils.IS_OS_WINDOWS ? "bat" : "sh");
-
-        Path path = new File(fileName).toPath();
-
-        if (Files.exists(path)) {
-            return fileName;
-        }
-
+    protected String buildCommand(String jobConfigFilePath, Map<String, Property> paramsMap) {
         // datax python command
-        String sbr = DATAX_PYTHON +
+        return DATAX_PYTHON +
                 " " +
                 DATAX_PATH +
                 " " +
@@ -409,25 +388,6 @@ public class DataxTask extends AbstractTask {
                 addCustomParameters(paramsMap) +
                 " " +
                 jobConfigFilePath;
-
-        // replace placeholder
-        String dataxCommand = ParameterUtils.convertParameterPlaceholders(sbr, ParameterUtils.convert(paramsMap));
-
-        log.debug("raw script : {}", dataxCommand);
-
-        // create shell command file
-        Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
-        FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
-
-        if (SystemUtils.IS_OS_WINDOWS) {
-            Files.createFile(path);
-        } else {
-            Files.createFile(path, attr);
-        }
-
-        Files.write(path, dataxCommand.getBytes(), StandardOpenOption.APPEND);
-
-        return fileName;
     }
 
     private StringBuilder addCustomParameters(Map<String, Property> paramsMap) {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java
index d5054e836d..e423fcb89d 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.plugin.task.datax;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
@@ -40,8 +41,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceP
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 
-import org.apache.commons.lang3.SystemUtils;
-
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -110,7 +109,7 @@ public class DataxTaskTest {
         taskResponse.setStatus(TaskRunStatus.SUCCESS);
         taskResponse.setExitStatusCode(0);
         taskResponse.setProcessId(1);
-        when(shellCommandExecutor.run(anyString(), eq(taskCallBack))).thenReturn(taskResponse);
+        when(shellCommandExecutor.run(any(), eq(taskCallBack))).thenReturn(taskResponse);
 
         dataxTask.handle(taskCallBack);
         Assertions.assertEquals(0, dataxTask.getExitStatusCode());
@@ -122,14 +121,8 @@ public class DataxTaskTest {
         boolean delete = jsonFile.delete();
         Assertions.assertTrue(delete);
 
-        File shellCommandFile = SystemUtils.IS_OS_WINDOWS ? new File("/tmp/execution/app-id_node.bat")
-                : new File("/tmp/execution/app-id_node.sh");
-        InputStream shellCommandInputStream = Files.newInputStream(shellCommandFile.toPath());
-        String shellCommandStr = FileUtils.readFile2Str(shellCommandInputStream);
-        Assertions.assertEquals(shellCommandStr, "python2.7 ${DATAX_HOME}/bin/datax.py  --jvm=\"-Xms1G -Xmx1G\" " +
-                " /tmp/execution/app-id_job.json");
-        delete = shellCommandFile.delete();
-        Assertions.assertTrue(delete);
+        Assertions.assertEquals(dataxTask.buildCommand("/tmp/execution/app-id_job.json", null),
+                "python2.7 ${DATAX_HOME}/bin/datax.py  --jvm=\"-Xms1G -Xmx1G\"  /tmp/execution/app-id_job.json");
     }
 
     @Test
@@ -151,7 +144,7 @@ public class DataxTaskTest {
         taskResponse.setStatus(TaskRunStatus.SUCCESS);
         taskResponse.setExitStatusCode(0);
         taskResponse.setProcessId(1);
-        when(shellCommandExecutor.run(anyString(), eq(taskCallBack))).thenReturn(taskResponse);
+        when(shellCommandExecutor.run(any(), eq(taskCallBack))).thenReturn(taskResponse);
 
         dataxTask.handle(taskCallBack);
         Assertions.assertEquals(0, dataxTask.getExitStatusCode());
@@ -163,14 +156,8 @@ public class DataxTaskTest {
         boolean delete = jsonFile.delete();
         Assertions.assertTrue(delete);
 
-        File shellCommandFile = SystemUtils.IS_OS_WINDOWS ? new File("/tmp/execution/app-id_node.bat")
-                : new File("/tmp/execution/app-id_node.sh");
-        InputStream shellCommandInputStream = Files.newInputStream(shellCommandFile.toPath());
-        String shellCommandStr = FileUtils.readFile2Str(shellCommandInputStream);
-        Assertions.assertEquals(shellCommandStr, "python2.7 ${DATAX_HOME}/bin/datax.py  --jvm=\"-Xms1G -Xmx1G\" " +
-                "-p \"-DDT='DT' -DDS='DS'\" /tmp/execution/app-id_job.json");
-        delete = shellCommandFile.delete();
-        Assertions.assertTrue(delete);
+        Assertions.assertEquals(dataxTask.buildCommand("/tmp/execution/app-id_job.json", createPrepareParamsMap()),
+                "python2.7 ${DATAX_HOME}/bin/datax.py  --jvm=\"-Xms1G -Xmx1G\" -p \"-DDT='DT' -DDS='DS'\" /tmp/execution/app-id_job.json");
     }
 
     @Test
@@ -187,7 +174,7 @@ public class DataxTaskTest {
         shellCommandExecutorFiled.setAccessible(true);
         shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
 
-        when(shellCommandExecutor.run(anyString(), eq(taskCallBack)))
+        when(shellCommandExecutor.run(any(), eq(taskCallBack)))
                 .thenThrow(new InterruptedException("Command execution failed"));
         Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack));
     }
@@ -206,7 +193,7 @@ public class DataxTaskTest {
         shellCommandExecutorFiled.setAccessible(true);
         shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
 
-        when(shellCommandExecutor.run(anyString(), eq(taskCallBack)))
+        when(shellCommandExecutor.run(any(), eq(taskCallBack)))
                 .thenThrow(new IOException("Command execution failed"));
         Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack));
     }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
index 37d2d2f8f9..032d953858 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
@@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -78,8 +80,9 @@ public class DvcTask extends AbstractTask {
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
             // construct process
-            String command = buildCommand();
-            TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack);
+            IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+                    .appendScript(buildCommand());
+            TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setProcessId(commandExecuteResult.getProcessId());
             parameters.dealOutParam(shellCommandExecutor.getVarPool());
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
index bc53696077..3a71df62c7 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
@@ -23,12 +23,13 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.plugin.task.api.stream.StreamTask;
-import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 
 import org.apache.commons.collections4.CollectionUtils;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 public class FlinkStreamTask extends FlinkTask implements StreamTask {
 
@@ -66,15 +67,15 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask {
      * @return command
      */
     @Override
-    protected String buildCommand() {
+    protected String getScript() {
         // flink run/run-application [OPTIONS] <jar-file> <arguments>
         List<String> args = FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters);
+        return args.stream().collect(Collectors.joining(" "));
+    }
 
-        String command = ParameterUtils
-                .convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
-
-        log.info("flink task command : {}", command);
-        return command;
+    @Override
+    protected Map<String, String> getProperties() {
+        return taskExecutionContext.getDefinedParams();
     }
 
     @Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
index 33afd7fc7f..75764c3677 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
@@ -22,11 +22,12 @@ import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
-import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 public class FlinkTask extends AbstractYarnTask {
 
@@ -69,15 +70,15 @@ public class FlinkTask extends AbstractYarnTask {
      * @return command
      */
     @Override
-    protected String buildCommand() {
+    protected String getScript() {
         // flink run/run-application [OPTIONS] <jar-file> <arguments>
         List<String> args = FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters);
+        return args.stream().collect(Collectors.joining(" "));
+    }
 
-        String command = ParameterUtils
-                .convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
-
-        log.info("flink task command : {}", command);
-        return command;
+    @Override
+    protected Map<String, String> getProperties() {
+        return taskExecutionContext.getDefinedParams();
     }
 
     @Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
index 8ba8409567..28443423c2 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
@@ -31,6 +31,8 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 
 import org.apache.commons.io.FileUtils;
@@ -88,7 +90,9 @@ public class HiveCliTask extends AbstractRemoteTask {
     @Override
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
-            final TaskResponse taskResponse = shellCommandExecutor.run(buildCommand(), taskCallBack);
+            IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+                    .appendScript(buildCommand());
+            final TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
             setExitStatusCode(taskResponse.getExitStatusCode());
             setAppIds(taskResponse.getAppIds());
             setProcessId(taskResponse.getProcessId());
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
index 9486ba5003..e9850b6459 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
@@ -32,6 +32,8 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
 import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 import org.apache.dolphinscheduler.plugin.task.java.exception.JavaSourceFileExistException;
@@ -122,7 +124,9 @@ public class JavaTask extends AbstractTask {
                     throw new RunTypeNotFoundException("run type is required, but it is null now.");
             }
             Preconditions.checkNotNull(command, "command not be null.");
-            TaskResponse taskResponse = shellCommandExecutor.run(command, taskCallBack);
+            IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+                    .appendScript(command);
+            TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
             log.info("java task run result: {}", taskResponse);
             setExitStatusCode(taskResponse.getExitStatusCode());
             setAppIds(taskResponse.getAppIds());
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
index e3058ee329..735b6237e2 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
@@ -26,9 +26,10 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 
 import org.apache.commons.lang3.StringUtils;
@@ -38,6 +39,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -82,7 +84,11 @@ public class JupyterTask extends AbstractRemoteTask {
     @Override
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
-            TaskResponse response = shellCommandExecutor.run(buildCommand(), taskCallBack);
+            IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+                    .properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
+                    .appendScript(buildCommand());
+
+            TaskResponse response = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
             setExitStatusCode(response.getExitStatusCode());
             setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
             setProcessId(response.getProcessId());
@@ -147,14 +153,7 @@ public class JupyterTask extends AbstractRemoteTask {
             args.add(String.format(JupyterConstants.REMOVE_ENV, timestamp));
         }
 
-        // replace placeholder
-        Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
-        String command = ParameterUtils
-                .convertParameterPlaceholders(String.join(" ", args), ParameterUtils.convert(paramsMap));
-
-        log.info("jupyter task command: {}", command);
-
-        return command;
+        return args.stream().collect(Collectors.joining(" "));
     }
 
     protected String readCondaPath() {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
index 132e6f6e56..c091ee6f64 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
@@ -26,9 +26,10 @@ import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
 import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 
 import org.apache.commons.lang3.BooleanUtils;
@@ -37,7 +38,6 @@ import org.apache.commons.lang3.StringUtils;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -100,8 +100,10 @@ public class LinkisTask extends AbstractRemoteTask {
     public void submitApplication() throws TaskException {
         try {
             // construct process
-            String command = buildCommand();
-            TaskResponse commandExecuteResult = shellCommandExecutor.run(command, null);
+            IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+                    .properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
+                    .appendScript(buildCommand());
+            TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, null);
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setAppIds(findTaskId(commandExecuteResult.getResultString()));
             setProcessId(commandExecuteResult.getProcessId());
@@ -127,7 +129,9 @@ public class LinkisTask extends AbstractRemoteTask {
             args.add(Constants.STATUS_OPTIONS);
             args.add(taskId);
             String command = String.join(Constants.SPACE, args);
-            TaskResponse commandExecuteResult = shellCommandExecutor.run(command, null);
+            IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+                    .appendScript(command);
+            TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, null);
             String status = findStatus(commandExecuteResult.getResultString());
             LinkisJobStatus jobStatus = LinkisJobStatus.convertFromJobStatusString(status);
             switch (jobStatus) {
@@ -160,7 +164,10 @@ public class LinkisTask extends AbstractRemoteTask {
             args.add(Constants.KILL_OPTIONS);
             args.add(taskId);
             String command = String.join(Constants.SPACE, args);
-            shellCommandExecutor.run(command, null);
+
+            IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+                    .appendScript(command);
+            shellCommandExecutor.run(shellActuatorBuilder, null);
             setExitStatusCode(EXIT_CODE_KILL);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -176,7 +183,7 @@ public class LinkisTask extends AbstractRemoteTask {
         List<String> args = new ArrayList<>();
         args.addAll(buildOptions());
 
-        String command = String.join(Constants.SPACE, args);
+        String command = String.join(" ", args);
         log.info("Linkis task command: {}", command);
 
         return command;
@@ -187,20 +194,13 @@ public class LinkisTask extends AbstractRemoteTask {
         args.add(Constants.SHELL_CLI_OPTIONS);
         args.add(Constants.ASYNC_OPTIONS);
         if (BooleanUtils.isTrue(linkisParameters.getUseCustom())) {
-            args.add(buildCustomConfigContent());
+            args.add(linkisParameters.getRawScript());
         } else {
             args.add(buildParamConfigContent());
         }
         return args;
     }
 
-    private String buildCustomConfigContent() {
-        log.info("raw custom config content : {}", linkisParameters.getRawScript());
-        String script = linkisParameters.getRawScript().replaceAll("\\r\\n", "\n");
-        script = parseScript(script);
-        return script;
-    }
-
     private String buildParamConfigContent() {
         log.info("raw param config content : {}", linkisParameters.getParamScript());
         String script = "";
@@ -210,7 +210,6 @@ public class LinkisTask extends AbstractRemoteTask {
                     .concat(Constants.SPACE)
                     .concat(param.getValue());
         }
-        script = parseScript(script);
         return script;
     }
 
@@ -248,8 +247,4 @@ public class LinkisTask extends AbstractRemoteTask {
         return linkisParameters;
     }
 
-    private String parseScript(String script) {
-        Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
-        return ParameterUtils.convertParameterPlaceholders(script, ParameterUtils.convert(paramsMap));
-    }
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
index e4fe074c05..3c1f73747c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
@@ -30,6 +30,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 
 import org.apache.commons.lang3.StringUtils;
@@ -38,6 +40,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * shell task
@@ -110,12 +113,15 @@ public class MlflowTask extends AbstractTask {
         }
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
             // construct process
-            String command = buildCommand();
-            TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack);
+            IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+                    .properties(ParameterUtils.convert(getParamsMap()))
+                    .appendScript(buildCommand());
+            TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
             int exitCode;
             if (mlflowParameters.getIsDeployDocker()) {
                 exitCode = checkDockerHealth();
@@ -165,7 +171,6 @@ public class MlflowTask extends AbstractTask {
      */
     private String buildCommandForMlflowProjects() {
 
-        Map<String, Property> paramsMap = getParamsMap();
         List<String> args = new ArrayList<>();
         args.add(
                 String.format(MlflowConstants.EXPORT_MLFLOW_TRACKING_URI_ENV, mlflowParameters.getMlflowTrackingUri()));
@@ -219,8 +224,7 @@ public class MlflowTask extends AbstractTask {
             runCommand = runCommand + " " + versionString;
         }
         args.add(runCommand);
-
-        return ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParameterUtils.convert(paramsMap));
+        return args.stream().collect(Collectors.joining("\n"));
     }
 
     /**
@@ -228,7 +232,6 @@ public class MlflowTask extends AbstractTask {
      */
     protected String buildCommandForMlflowModels() {
 
-        Map<String, Property> paramsMap = getParamsMap();
         List<String> args = new ArrayList<>();
         args.add(
                 String.format(MlflowConstants.EXPORT_MLFLOW_TRACKING_URI_ENV, mlflowParameters.getMlflowTrackingUri()));
@@ -247,8 +250,7 @@ public class MlflowTask extends AbstractTask {
             args.add(String.format(MlflowConstants.DOCKER_RUN, containerName, mlflowParameters.getDeployPort(),
                     imageName));
         }
-
-        return ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParameterUtils.convert(paramsMap));
+        return args.stream().collect(Collectors.joining("\n"));
     }
 
     private Map<String, Property> getParamsMap() {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
index 2ae5c6e707..dd51314dfa 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
@@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * mapreduce task
@@ -90,19 +91,19 @@ public class MapReduceTask extends AbstractYarnTask {
      * @return command
      */
     @Override
-    protected String buildCommand() {
+    protected String getScript() {
         // hadoop jar <jar> [mainClass] [GENERIC_OPTIONS] args...
         List<String> args = new ArrayList<>();
         args.add(MAPREDUCE_COMMAND);
 
         // other parameters
         args.addAll(MapReduceArgsUtils.buildArgs(mapreduceParameters, taskExecutionContext));
+        return args.stream().collect(Collectors.joining(" "));
+    }
 
-        String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args),
-                taskExecutionContext.getDefinedParams());
-        log.info("mapreduce task command: {}", command);
-
-        return command;
+    @Override
+    protected Map<String, String> getProperties() {
+        return taskExecutionContext.getDefinedParams();
     }
 
     @Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
index f60d5f1107..4b4d8cbd66 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
@@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 
 import org.apache.commons.io.FileUtils;
@@ -86,6 +88,7 @@ public class PythonTask extends AbstractTask {
         }
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
@@ -96,9 +99,11 @@ public class PythonTask extends AbstractTask {
 
             // create this file
             createPythonCommandFileIfNotExists(pythonScriptContent, pythonScriptFile);
-            String command = buildPythonExecuteCommand(pythonScriptFile);
 
-            TaskResponse taskResponse = shellCommandExecutor.run(command, taskCallBack);
+            IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+                    .appendScript(buildPythonExecuteCommand(pythonScriptFile));
+
+            TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
             setExitStatusCode(taskResponse.getExitStatusCode());
             setProcessId(taskResponse.getProcessId());
             setVarPool(shellCommandExecutor.getVarPool());
@@ -165,9 +170,8 @@ public class PythonTask extends AbstractTask {
      * build python script content
      *
      * @return raw python script
-     * @throws Exception exception
      */
-    protected String buildPythonScriptContent() throws Exception {
+    protected String buildPythonScriptContent() {
         log.info("raw python script : {}", pythonParameters.getRawScript());
         String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", System.lineSeparator());
         Map<String, Property> paramsMap = mergeParamsWithContext(pythonParameters);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
index afe371628c..806750877c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
@@ -24,14 +24,15 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
+import java.util.stream.Collectors;
 
 public class PytorchTask extends AbstractTask {
 
@@ -64,11 +65,15 @@ public class PytorchTask extends AbstractTask {
         pythonEnvManager.setCondaPythonVersion(pytorchParameters.getCondaPythonVersion());
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
-            String command = buildPythonExecuteCommand();
-            TaskResponse taskResponse = shellCommandExecutor.run(command, taskCallBack);
+            IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+                    .properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
+                    .appendScript(buildPythonExecuteCommand());
+
+            TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
             setExitStatusCode(taskResponse.getExitStatusCode());
             setProcessId(taskResponse.getProcessId());
             setVarPool(shellCommandExecutor.getVarPool());
@@ -116,9 +121,7 @@ public class PytorchTask extends AbstractTask {
             args.add(String.format("%s %s", getPythonCommand(), pytorchParameters.getScriptPath()));
 
         }
-
-        Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
-        return ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParameterUtils.convert(paramsMap));
+        return args.stream().collect(Collectors.joining("\n"));
     }
 
     private String getPythonCommand() {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index bbf42ed8fa..0b837fa568 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -30,6 +30,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 
 import org.apache.commons.io.FileUtils;
@@ -100,7 +102,10 @@ public class SeatunnelTask extends AbstractRemoteTask {
         try {
             // construct process
             String command = buildCommand();
-            TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack);
+            IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+                    .appendScript(command);
+
+            TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
             setProcessId(commandExecuteResult.getProcessId());
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
index 7d8fdd2e5a..c3c2bf62a1 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
@@ -19,26 +19,18 @@ package org.apache.dolphinscheduler.plugin.task.shell;
 
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
 
-import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 
-import org.apache.commons.lang3.SystemUtils;
-
-import java.io.File;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.Map;
-
 /**
  * shell task
  */
@@ -84,12 +76,15 @@ public class ShellTask extends AbstractTask {
         }
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void handle(TaskCallBack taskCallBack) throws TaskException {
         try {
-            // construct process
-            String command = buildCommand();
-            TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack);
+            IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+                    .properties(ParameterUtils.convert(shellParameters.getLocalParametersMap()))
+                    .appendScript(shellParameters.getRawScript());
+
+            TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setProcessId(commandExecuteResult.getProcessId());
             shellParameters.dealOutParam(shellCommandExecutor.getVarPool());
@@ -115,47 +110,9 @@ public class ShellTask extends AbstractTask {
         }
     }
 
-    /**
-     * create command
-     *
-     * @return file name
-     * @throws Exception exception
-     */
-    private String buildCommand() throws Exception {
-        // generate scripts
-        String fileName = String.format("%s/%s_node.%s",
-                taskExecutionContext.getExecutePath(),
-                taskExecutionContext.getTaskAppId(), SystemUtils.IS_OS_WINDOWS ? "bat" : "sh");
-
-        File file = new File(fileName);
-        Path path = file.toPath();
-
-        if (Files.exists(path)) {
-            // this shouldn't happen
-            log.warn("The command file: {} is already exist", path);
-            return fileName;
-        }
-
-        String script = shellParameters.getRawScript().replaceAll("\\r\\n", System.lineSeparator());
-        script = parseScript(script);
-        shellParameters.setRawScript(script);
-
-        log.info("raw script : {}", shellParameters.getRawScript());
-        log.info("task execute path : {}", taskExecutionContext.getExecutePath());
-
-        FileUtils.createFileWith755(path);
-        Files.write(path, shellParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
-
-        return fileName;
-    }
-
     @Override
     public AbstractParameters getParameters() {
         return shellParameters;
     }
 
-    private String parseScript(String script) {
-        Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
-        return ParameterUtils.convertParameterPlaceholders(script, ParameterUtils.convert(paramsMap));
-    }
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
index 675320d49f..ef0b71c8fe 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
@@ -51,6 +51,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import io.fabric8.kubernetes.client.Config;
 
@@ -88,13 +89,8 @@ public class SparkTask extends AbstractYarnTask {
         log.info("Initialize spark task params {}", JSONUtils.toPrettyJsonString(sparkParameters));
     }
 
-    /**
-     * create command
-     *
-     * @return command
-     */
     @Override
-    protected String buildCommand() {
+    protected String getScript() {
         /**
          * (1) spark-submit [options] <app jar | python file> [app arguments]
          * (2) spark-sql [options] -f <filename>
@@ -116,14 +112,12 @@ public class SparkTask extends AbstractYarnTask {
         args.addAll(populateSparkOptions());
 
         // replace placeholder
-        Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
-
-        String command =
-                ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParameterUtils.convert(paramsMap));
-
-        log.info("spark task command: {}", command);
+        return args.stream().collect(Collectors.joining(" "));
+    }
 
-        return command;
+    @Override
+    protected Map<String, String> getProperties() {
+        return ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap());
     }
 
     /**
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
index 14ccbcd56f..ba031bb304 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
@@ -54,7 +54,7 @@ public class SparkTaskTest {
                         "--conf spark.executor.memory=1G " +
                         "--name sparksql " +
                         "-f /tmp/5536_node.sql",
-                sparkTask.buildCommand());
+                sparkTask.getScript());
     }
 
     @Test
@@ -79,7 +79,7 @@ public class SparkTaskTest {
                         "--conf spark.executor.memory=1G " +
                         "--name spark " +
                         "/lib/dolphinscheduler-task-spark.jar",
-                sparkTask.buildCommand());
+                sparkTask.getScript());
     }
 
     private String buildSparkParametersWithSparkSql() {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
index b8d35e4fdf..016a07a56b 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
@@ -22,7 +22,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 import org.apache.dolphinscheduler.plugin.task.sqoop.generator.SqoopJobGenerator;
@@ -72,17 +71,16 @@ public class SqoopTask extends AbstractYarnTask {
     }
 
     @Override
-    protected String buildCommand() {
+    protected String getScript() {
         // get sqoop scripts
         SqoopJobGenerator generator = new SqoopJobGenerator();
-        String script = generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext);
+        return generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext);
 
-        Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
-
-        String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParameterUtils.convert(paramsMap));
-        log.info("sqoop script: {}", resultScripts);
-        return resultScripts;
+    }
 
+    @Override
+    protected Map<String, String> getProperties() {
+        return ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap());
     }
 
     @Override