You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2023/07/19 06:50:10 UTC
[dolphinscheduler] branch dev updated: Support execute shell in different interceptor (#14582)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new c30cca9d9a Support execute shell in different interceptor (#14582)
c30cca9d9a is described below
commit c30cca9d9a0386002437829de9310b592c6d54bb
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Jul 19 14:50:04 2023 +0800
Support execute shell in different interceptor (#14582)
---
.../src/main/resources/common.properties | 2 +
.../plugin/task/api/AbstractCommandExecutor.java | 153 ++++++------------
.../plugin/task/api/AbstractYarnTask.java | 36 ++---
.../plugin/task/api/ShellCommandExecutor.java | 107 -------------
.../plugin/task/api/TaskExecutionContext.java | 1 +
.../shell/BaseLinuxShellInterceptorBuilder.java | 171 +++++++++++++++++++++
.../task/api/shell/BaseShellInterceptor.java | 50 ++++++
.../api/shell/BaseShellInterceptorBuilder.java | 141 +++++++++++++++++
.../shell/BaseWindowsShellInterceptorBuilder.java | 116 ++++++++++++++
.../plugin/task/api/shell/IShellInterceptor.java | 30 ++++
.../task/api/shell/IShellInterceptorBuilder.java | 52 +++++++
.../api/shell/ShellInterceptorBuilderFactory.java | 43 ++++++
.../task/api/shell/bash/BashShellInterceptor.java | 30 ++++
.../shell/bash/BashShellInterceptorBuilder.java | 56 +++++++
.../task/api/shell/cmd/CmdShellInterceptor.java | 29 ++++
.../api/shell/cmd/CmdShellInterceptorBuilder.java | 55 +++++++
.../task/api/shell/sh/ShShellInterceptor.java | 29 ++++
.../api/shell/sh/ShShellInterceptorBuilder.java | 60 ++++++++
.../plugin/task/chunjun/ChunJunTask.java | 61 ++------
.../plugin/task/dq/DataQualityTask.java | 17 +-
.../plugin/task/datax/DataxTask.java | 60 ++------
.../plugin/task/datax/DataxTaskTest.java | 31 ++--
.../dolphinscheduler/plugin/task/dvc/DvcTask.java | 7 +-
.../plugin/task/flink/FlinkStreamTask.java | 15 +-
.../plugin/task/flink/FlinkTask.java | 15 +-
.../plugin/task/hivecli/HiveCliTask.java | 6 +-
.../plugin/task/java/JavaTask.java | 6 +-
.../plugin/task/jupyter/JupyterTask.java | 19 ++-
.../plugin/task/linkis/LinkisTask.java | 35 ++---
.../plugin/task/mlflow/MlflowTask.java | 18 ++-
.../plugin/task/mr/MapReduceTask.java | 13 +-
.../plugin/task/python/PythonTask.java | 12 +-
.../plugin/task/pytorch/PytorchTask.java | 17 +-
.../plugin/task/seatunnel/SeatunnelTask.java | 7 +-
.../plugin/task/shell/ShellTask.java | 59 +------
.../plugin/task/spark/SparkTask.java | 20 +--
.../plugin/task/spark/SparkTaskTest.java | 4 +-
.../plugin/task/sqoop/SqoopTask.java | 14 +-
38 files changed, 1081 insertions(+), 516 deletions(-)
diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties
index e7c119197e..3107929d56 100644
--- a/dolphinscheduler-common/src/main/resources/common.properties
+++ b/dolphinscheduler-common/src/main/resources/common.properties
@@ -153,6 +153,8 @@ appId.collect=log
# The default env list will be load by Shell task, e.g. /etc/profile,~/.bash_profile
shell.env_source_list=
+# The interceptor type of Shell task, e.g. bash, sh, cmd
+shell.interceptor.type=bash
# Whether to enable remote logging
remote.logging.enable=false
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index 0967c642fc..4c8edc6dac 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -25,22 +25,20 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD
import org.apache.dolphinscheduler.common.constants.TenantConstants;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
-import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptor;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ShellUtils;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.SystemUtils;
import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
-import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -114,111 +112,61 @@ public abstract class AbstractCommandExecutor {
}
}
- public AbstractCommandExecutor(LinkedBlockingQueue<String> logBuffer) {
- this.logBuffer = logBuffer;
- }
-
- /**
- * build process
- *
- * @param commandFile command file
- * @throws IOException IO Exception
- */
- private void buildProcess(String commandFile) throws IOException {
- // setting up user to run commands
- List<String> command = new LinkedList<>();
-
- // init process builder
- ProcessBuilder processBuilder = new ProcessBuilder();
- // setting up a working directory
- processBuilder.directory(new File(taskRequest.getExecutePath()));
- // merge error information to standard output stream
- processBuilder.redirectErrorStream(true);
-
- // if sudo.enable=true,setting up user to run commands
- // todo: Create a ShellExecuteClass to generate the shell and execute shell commands
- if (OSUtils.isSudoEnable() && !TenantConstants.DEFAULT_TENANT_CODE.equals(taskRequest.getTenantCode())) {
- if (SystemUtils.IS_OS_LINUX
- && PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE)) {
- generateCgroupCommand(command);
- } else {
- command.add("sudo");
- command.add("-u");
- command.add(taskRequest.getTenantCode());
- command.add("-E");
- }
- }
- command.add(commandInterpreter());
- command.add(commandFile);
-
- // setting commands
- processBuilder.command(command);
- process = processBuilder.start();
-
- printCommand(command);
- }
-
- /**
- * generate systemd command.
- * eg: sudo systemd-run -q --scope -p CPUQuota=100% -p MemoryLimit=200M --uid=root
- * @param command command
- */
- private void generateCgroupCommand(List<String> command) {
- Integer cpuQuota = taskRequest.getCpuQuota();
- Integer memoryMax = taskRequest.getMemoryMax();
-
- command.add("sudo");
- command.add("systemd-run");
- command.add("-q");
- command.add("--scope");
-
- if (cpuQuota == -1) {
- command.add("-p");
- command.add("CPUQuota=");
- } else {
- command.add("-p");
- command.add(String.format("CPUQuota=%s%%", taskRequest.getCpuQuota()));
- }
-
- // use `man systemd.resource-control` to find available parameter
- if (memoryMax == -1) {
- command.add("-p");
- command.add(String.format("MemoryLimit=%s", "infinity"));
- } else {
- command.add("-p");
- command.add(String.format("MemoryLimit=%sM", taskRequest.getMemoryMax()));
- }
-
- command.add(String.format("--uid=%s", taskRequest.getTenantCode()));
- }
-
- public TaskResponse run(String execCommand, TaskCallBack taskCallBack) throws Exception {
+ // todo: We need to build the IShellActuator in outer class, since different task may have specific logic to build
+ // the IShellActuator
+ public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
+ TaskCallBack taskCallBack) throws Exception {
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
+ logger.warn(
+ "Cannot find the taskInstance: {} from TaskExecutionContextCacheManager, the task might already been killed",
+ taskInstanceId);
result.setExitStatusCode(EXIT_CODE_KILL);
return result;
}
- if (StringUtils.isEmpty(execCommand)) {
- TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
- return result;
+ iShellInterceptorBuilder = iShellInterceptorBuilder
+ .shellDirectory(taskRequest.getExecutePath())
+ .shellName(taskRequest.getTaskAppId());
+ // Set system env
+ if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
+ ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);
+ }
+ // Set custom env
+ if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
+ iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig());
+ }
+ // Set k8s config (This is only work in Linux)
+ if (taskRequest.getK8sTaskExecutionContext() != null) {
+ iShellInterceptorBuilder.k8sConfigYaml(taskRequest.getK8sTaskExecutionContext().getConfigYaml());
+ }
+ // Set sudo (This is only work in Linux)
+ iShellInterceptorBuilder.sudoMode(OSUtils.isSudoEnable());
+ // Set tenant (This is only work in Linux)
+ if (TenantConstants.DEFAULT_TENANT_CODE.equals(taskRequest.getTenantCode())) {
+ iShellInterceptorBuilder.runUser(TenantConstants.BOOTSTRAPT_SYSTEM_USER);
+ } else {
+ iShellInterceptorBuilder.runUser(taskRequest.getTenantCode());
+ }
+ // Set CPU Quota (This is only work in Linux)
+ if (taskRequest.getCpuQuota() != null) {
+ iShellInterceptorBuilder.cpuQuota(taskRequest.getCpuQuota());
+ }
+ // Set memory Quota (This is only work in Linux)
+ if (taskRequest.getMemoryMax() != null) {
+ iShellInterceptorBuilder.memoryQuota(taskRequest.getMemoryMax());
}
- String commandFilePath = buildCommandFilePath();
-
- // create command file if not exists
- createCommandFileIfNotExists(execCommand, commandFilePath);
-
- // build process
- buildProcess(commandFilePath);
+ IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build();
+ process = iShellInterceptor.execute();
// parse process output
- parseProcessOutput(process);
+ parseProcessOutput(this.process);
// collect pod log
collectPodLogIfNeeded();
- int processId = getProcessId(process);
+ int processId = getProcessId(this.process);
result.setProcessId(processId);
@@ -243,7 +191,7 @@ public abstract class AbstractCommandExecutor {
}
// waiting for the run to finish
- boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
+ boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);
TaskExecutionStatus kubernetesStatus =
ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());
@@ -272,7 +220,7 @@ public abstract class AbstractCommandExecutor {
if (status && kubernetesStatus.isSuccess()) {
// SHELL task state
- result.setExitStatusCode(process.exitValue());
+ result.setExitStatusCode(this.process.exitValue());
} else {
logger.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
@@ -280,7 +228,7 @@ public abstract class AbstractCommandExecutor {
result.setExitStatusCode(EXIT_CODE_FAILURE);
cancelApplication();
}
- int exitCode = process.exitValue();
+ int exitCode = this.process.exitValue();
String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
logger.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
@@ -446,9 +394,4 @@ public abstract class AbstractCommandExecutor {
return processId;
}
- protected abstract String buildCommandFilePath();
-
- protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
-
- protected abstract String commandInterpreter();
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
index 406e78a80b..69f6aceb99 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
@@ -22,31 +22,17 @@ import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COL
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import java.util.List;
-import java.util.regex.Pattern;
+import java.util.Map;
-/**
- * abstract yarn task
- */
public abstract class AbstractYarnTask extends AbstractRemoteTask {
- /**
- * process task
- */
private ShellCommandExecutor shellCommandExecutor;
- /**
- * rules for extracting application ID
- */
- protected static final Pattern YARN_APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX);
-
- /**
- * Abstract Yarn Task
- *
- * @param taskRequest taskRequest
- */
public AbstractYarnTask(TaskExecutionContext taskRequest) {
super(taskRequest);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
@@ -58,8 +44,12 @@ public abstract class AbstractYarnTask extends AbstractRemoteTask {
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
+ IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+ .properties(getProperties())
+ // todo: do we need to move the replace to subclass?
+ .appendScript(getScript().replaceAll("\\r\\n", System.lineSeparator()));
// SHELL task exit code
- TaskResponse response = shellCommandExecutor.run(buildCommand(), taskCallBack);
+ TaskResponse response = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(response.getExitStatusCode());
// set appIds
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
@@ -115,10 +105,12 @@ public abstract class AbstractYarnTask extends AbstractRemoteTask {
}
/**
- * create command
- *
- * @return String
+ * Get the script used to bootstrap the task
*/
- protected abstract String buildCommand();
+ protected abstract String getScript();
+ /**
+ * Get the properties of the task used to replace the placeholders in the script.
+ */
+ protected abstract Map<String, String> getProperties();
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
index 9bcc765bb3..2dbea62287 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
@@ -17,19 +17,6 @@
package org.apache.dolphinscheduler.plugin.task.api;
-import org.apache.dolphinscheduler.common.utils.FileUtils;
-import org.apache.dolphinscheduler.plugin.task.api.utils.ShellUtils;
-
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.SystemUtils;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
@@ -40,104 +27,10 @@ import org.slf4j.Logger;
*/
public class ShellCommandExecutor extends AbstractCommandExecutor {
- /**
- * For Unix-like, using bash
- */
- private static final String SH = "bash";
-
- /**
- * For Windows, using cmd.exe
- */
- private static final String CMD = "cmd.exe";
-
- /**
- * constructor
- *
- * @param logHandler logHandler
- * @param taskRequest taskRequest
- * @param logger logger
- */
public ShellCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
TaskExecutionContext taskRequest,
Logger logger) {
super(logHandler, taskRequest, logger);
}
- public ShellCommandExecutor(LinkedBlockingQueue<String> logBuffer) {
- super(logBuffer);
- }
-
- @Override
- protected String buildCommandFilePath() {
- // command file
- return String.format("%s/%s.%s", taskRequest.getExecutePath(), taskRequest.getTaskAppId(),
- SystemUtils.IS_OS_WINDOWS ? "bat" : "command");
- }
-
- /**
- * create command file if not exists
- *
- * @param execCommand exec command
- * @param commandFile command file
- * @throws IOException io exception
- */
- @Override
- protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
- // create if non existence
- logger.info("Begin to create command file:{}", commandFile);
-
- Path commandFilePath = Paths.get(commandFile);
- if (Files.exists(commandFilePath)) {
- logger.warn("The command file: {} is already exist, will not create a again", commandFile);
- return;
- }
-
- StringBuilder sb = new StringBuilder();
- if (SystemUtils.IS_OS_WINDOWS) {
- sb.append("@echo off").append(System.lineSeparator());
- sb.append("cd /d %~dp0").append(System.lineSeparator());
- if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
- for (String envSourceFile : ShellUtils.ENV_SOURCE_LIST) {
- sb.append("call ").append(envSourceFile).append("\n");
- }
- }
- if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
- sb.append(taskRequest.getEnvironmentConfig()).append(System.lineSeparator());
- }
- } else {
- sb.append("#!/bin/bash").append(System.lineSeparator());
- sb.append("BASEDIR=$(cd `dirname $0`; pwd)").append(System.lineSeparator());
- sb.append("cd $BASEDIR").append(System.lineSeparator());
- if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
- for (String envSourceFile : ShellUtils.ENV_SOURCE_LIST) {
- sb.append("source ").append(envSourceFile).append("\n");
- }
- }
- if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
- sb.append(taskRequest.getEnvironmentConfig()).append(System.lineSeparator());
- }
- if (Objects.nonNull(taskRequest.getK8sTaskExecutionContext())) {
- String configYaml = taskRequest.getK8sTaskExecutionContext().getConfigYaml();
- Path kubeConfigPath = Paths.get(org.apache.dolphinscheduler.common.utils.FileUtils
- .getKubeConfigPath(taskRequest.getExecutePath()));
- FileUtils.createFileWith755(kubeConfigPath);
- Files.write(kubeConfigPath, configYaml.getBytes(), StandardOpenOption.APPEND);
- sb.append("export KUBECONFIG=" + kubeConfigPath).append(System.lineSeparator());
- logger.info("Create kubernetes configuration file: {}.", kubeConfigPath);
- }
- }
- sb.append(execCommand);
- String commandContent = sb.toString();
-
- FileUtils.createFileWith755(commandFilePath);
- Files.write(commandFilePath, commandContent.getBytes(), StandardOpenOption.APPEND);
-
- logger.info("Success create command file, command: {}", commandContent);
- }
-
- @Override
- protected String commandInterpreter() {
- return SystemUtils.IS_OS_WINDOWS ? CMD : SH;
- }
-
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
index b8d125e5a9..de9a727567 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
@@ -173,6 +173,7 @@ public class TaskExecutionContext implements Serializable {
/**
* definedParams
+ * // todo: we need to rename definedParams, prepareParamsMap, paramsMap, this is confusing
*/
private Map<String, String> definedParams;
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java
new file mode 100644
index 0000000000..c0f03fdccd
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell;
+
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public abstract class BaseLinuxShellInterceptorBuilder<T extends BaseLinuxShellInterceptorBuilder<T, Y>, Y extends BaseShellInterceptor>
+ extends
+ BaseShellInterceptorBuilder<T, Y> {
+
+ protected void generateShellScript() throws IOException {
+ List<String> finalScripts = new ArrayList<>();
+ // add shell header
+ finalScripts.add(shellHeader());
+ finalScripts.add("BASEDIR=$(cd `dirname $0`; pwd)");
+ finalScripts.add("cd $BASEDIR");
+ // add system env
+ finalScripts.addAll(systemEnvScript());
+ // add custom env
+ finalScripts.addAll(customEnvScript());
+ // add k8s config
+ finalScripts.addAll(k8sConfig());
+ // add shell body
+ finalScripts.add(shellBody());
+ // create shell file
+ String finalScript = finalScripts.stream().collect(Collectors.joining(System.lineSeparator()));
+ Path shellAbsolutePath = shellAbsolutePath();
+ FileUtils.createFileWith755(shellAbsolutePath);
+ Files.write(shellAbsolutePath, finalScript.getBytes(), StandardOpenOption.APPEND);
+ log.info("Final Shell file is : \n{}", finalScript);
+ }
+
+ protected List<String> generateBootstrapCommand() {
+ if (sudoEnable) {
+ return bootstrapCommandInSudoMode();
+ }
+ return bootstrapCommandInNormalMode();
+ }
+
+ protected abstract String shellHeader();
+
+ protected abstract String shellInterpreter();
+
+ protected abstract String shellExtension();
+
+ private List<String> systemEnvScript() {
+ if (CollectionUtils.isEmpty(systemEnvs)) {
+ return Collections.emptyList();
+ }
+ return systemEnvs
+ .stream()
+ .map(systemEnv -> "source " + systemEnv).collect(Collectors.toList());
+ }
+
+ private List<String> customEnvScript() {
+ if (CollectionUtils.isEmpty(customEnvScripts)) {
+ return Collections.emptyList();
+ }
+ return customEnvScripts;
+ }
+
+ private List<String> k8sConfig() throws IOException {
+ if (StringUtils.isEmpty(k8sConfigYaml)) {
+ return Collections.emptyList();
+ }
+ Path kubeConfigPath = Paths.get(FileUtils.getKubeConfigPath(shellDirectory));
+ FileUtils.createFileWith755(kubeConfigPath);
+ Files.write(kubeConfigPath, k8sConfigYaml.getBytes(), StandardOpenOption.APPEND);
+ log.info("Created kubernetes configuration file: {}.", kubeConfigPath);
+ return Collections.singletonList("export KUBECONFIG=" + kubeConfigPath);
+ }
+
+ private String shellBody() {
+ if (CollectionUtils.isEmpty(scripts)) {
+ return StringUtils.EMPTY;
+ }
+ String scriptBody = scripts
+ .stream()
+ .collect(Collectors.joining(System.lineSeparator()));
+ scriptBody = scriptBody.replaceAll("\\r\\n", System.lineSeparator());
+ return ParameterUtils.convertParameterPlaceholders(scriptBody, propertyMap);
+ }
+
+ private Path shellAbsolutePath() {
+ return Paths.get(shellDirectory, shellName + shellExtension());
+ }
+
+ private List<String> bootstrapCommandInSudoMode() {
+ if (PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE)) {
+ return bootstrapCommandInResourceLimitMode();
+ }
+ List<String> bootstrapCommand = new ArrayList<>();
+ bootstrapCommand.add("sudo");
+ if (StringUtils.isNotBlank(runUser)) {
+ bootstrapCommand.add("-u");
+ bootstrapCommand.add(runUser);
+ }
+ bootstrapCommand.add("-E");
+ bootstrapCommand.add(shellAbsolutePath().toString());
+ return bootstrapCommand;
+ }
+
+ private List<String> bootstrapCommandInNormalMode() {
+ List<String> bootstrapCommand = new ArrayList<>();
+ bootstrapCommand.add(shellInterpreter());
+ bootstrapCommand.add(shellAbsolutePath().toString());
+ return bootstrapCommand;
+ }
+
+ private List<String> bootstrapCommandInResourceLimitMode() {
+ List<String> bootstrapCommand = new ArrayList<>();
+ bootstrapCommand.add("sudo");
+ bootstrapCommand.add("systemd-run");
+ bootstrapCommand.add("-q");
+ bootstrapCommand.add("--scope");
+
+ if (cpuQuota == -1) {
+ bootstrapCommand.add("-p");
+ bootstrapCommand.add("CPUQuota=");
+ } else {
+ bootstrapCommand.add("-p");
+ bootstrapCommand.add(String.format("CPUQuota=%s%%", cpuQuota));
+ }
+
+ // use `man systemd.resource-control` to find available parameter
+ if (memoryQuota == -1) {
+ bootstrapCommand.add("-p");
+ bootstrapCommand.add(String.format("MemoryLimit=%s", "infinity"));
+ } else {
+ bootstrapCommand.add("-p");
+ bootstrapCommand.add(String.format("MemoryLimit=%sM", memoryQuota));
+ }
+
+ bootstrapCommand.add(String.format("--uid=%s", runUser));
+ return bootstrapCommand;
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptor.java
new file mode 100644
index 0000000000..0f82956204
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public abstract class BaseShellInterceptor implements IShellInterceptor {
+
+ protected final String workingDirectory;
+ protected final List<String> executeCommands;
+
+ protected BaseShellInterceptor(List<String> executeCommands, String workingDirectory) {
+ this.executeCommands = executeCommands;
+ this.workingDirectory = workingDirectory;
+ }
+
+ @Override
+ public Process execute() throws IOException {
+ // init process builder
+ ProcessBuilder processBuilder = new ProcessBuilder();
+ // setting up a working directory
+ processBuilder.directory(new File(workingDirectory));
+ // merge error information to standard output stream
+ processBuilder.redirectErrorStream(true);
+ processBuilder.command(executeCommands);
+ log.info("Executing shell command : {}", String.join(" ", executeCommands));
+ return processBuilder.start();
+ }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptorBuilder.java
new file mode 100644
index 0000000000..329e79661c
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptorBuilder.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell;
+
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class BaseShellInterceptorBuilder<T extends BaseShellInterceptorBuilder<T, Y>, Y extends BaseShellInterceptor>
+ implements
+ IShellInterceptorBuilder<T, Y> {
+
+ protected String shellDirectory;
+
+ protected String shellName;
+
+ protected String runUser;
+
+ protected Integer cpuQuota;
+
+ protected Integer memoryQuota;
+
+ protected List<String> systemEnvs = new ArrayList<>();
+
+ protected List<String> customEnvScripts = new ArrayList<>();
+
+ protected String k8sConfigYaml;
+
+ protected Map<String, String> propertyMap = new HashMap<>();
+
+ protected boolean sudoEnable;
+
+ protected List<String> scripts = new ArrayList<>();
+
+ protected BaseShellInterceptorBuilder() {
+ }
+
+ @Override
+ public T newBuilder(T builder) {
+ T newBuilder = newBuilder();
+ newBuilder.shellDirectory = builder.shellDirectory;
+ newBuilder.shellName = builder.shellName;
+ newBuilder.runUser = builder.runUser;
+ newBuilder.cpuQuota = builder.cpuQuota;
+ newBuilder.memoryQuota = builder.memoryQuota;
+ newBuilder.systemEnvs = builder.systemEnvs;
+ newBuilder.customEnvScripts = builder.customEnvScripts;
+ newBuilder.k8sConfigYaml = builder.k8sConfigYaml;
+ newBuilder.propertyMap = builder.propertyMap;
+ newBuilder.sudoEnable = builder.sudoEnable;
+ newBuilder.scripts = builder.scripts;
+ return newBuilder;
+ }
+
+ @Override
+ public T shellDirectory(String shellDirectory) {
+ this.shellDirectory = shellDirectory;
+ return (T) this;
+ }
+
+ @Override
+ public T shellName(String shellFilename) {
+ this.shellName = shellFilename;
+ return (T) this;
+ }
+
+ @Override
+ public T runUser(String systemUser) {
+ this.runUser = systemUser;
+ return (T) this;
+ }
+
+ @Override
+ public T cpuQuota(Integer cpuQuota) {
+ this.cpuQuota = cpuQuota;
+ return (T) this;
+ }
+
+ @Override
+ public T memoryQuota(Integer memoryQuota) {
+ this.memoryQuota = memoryQuota;
+ return (T) this;
+ }
+
+ @Override
+ public T appendSystemEnv(String envFiles) {
+ systemEnvs.add(envFiles);
+ return (T) this;
+ }
+
+ @Override
+ public T appendCustomEnvScript(String customEnvScript) {
+ customEnvScripts.add(customEnvScript);
+ return (T) this;
+ }
+
+ @Override
+ public T k8sConfigYaml(String k8sConfigYaml) {
+ this.k8sConfigYaml = k8sConfigYaml;
+ return (T) this;
+ }
+
+ @Override
+ public T properties(Map<String, String> propertyMap) {
+ if (MapUtils.isNotEmpty(propertyMap)) {
+ this.propertyMap.putAll(propertyMap);
+ }
+ return (T) this;
+ }
+
+ @Override
+ public T sudoMode(boolean sudoEnable) {
+ this.sudoEnable = sudoEnable;
+ return (T) this;
+ }
+
+ @Override
+ public T appendScript(String script) {
+ scripts.add(script);
+ return (T) this;
+ }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseWindowsShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseWindowsShellInterceptorBuilder.java
new file mode 100644
index 0000000000..9d31bf34c3
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseWindowsShellInterceptorBuilder.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell;
+
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public abstract class BaseWindowsShellInterceptorBuilder<T extends BaseWindowsShellInterceptorBuilder<T, Y>, Y extends BaseShellInterceptor>
+ extends
+ BaseShellInterceptorBuilder<T, Y> {
+
+ protected void generateShellScript() throws IOException {
+ List<String> finalScripts = new ArrayList<>();
+ // add shell header
+ finalScripts.add(shellHeader());
+ finalScripts.add("cd /d %~dp0");
+ // add system env
+ finalScripts.addAll(systemEnvScript());
+ // add custom env
+ finalScripts.addAll(customEnvScript());
+ // add k8s config
+ finalScripts.addAll(k8sConfig());
+ // add shell body
+ finalScripts.add(shellBody());
+ // create shell file
+ String finalScript = finalScripts.stream().collect(Collectors.joining(System.lineSeparator()));
+ Path shellAbsolutePath = shellAbsolutePath();
+ FileUtils.createFileWith755(shellAbsolutePath);
+ Files.write(shellAbsolutePath, finalScript.getBytes(), StandardOpenOption.APPEND);
+ log.info("Final Shell file is : \n{}", finalScript);
+ }
+
+ private String shellBody() {
+ if (CollectionUtils.isEmpty(scripts)) {
+ return StringUtils.EMPTY;
+ }
+ String scriptBody = scripts
+ .stream()
+ .collect(Collectors.joining(System.lineSeparator()));
+ return ParameterUtils.convertParameterPlaceholders(scriptBody, propertyMap);
+ }
+
+ private Collection<String> k8sConfig() {
+ log.warn("k8s config is not supported in windows");
+ return Collections.emptyList();
+ }
+
+ protected List<String> generateBootstrapCommand() {
+ if (sudoEnable) {
+ log.warn("sudo is not supported in windows");
+ }
+ // todo: support tenant in widnows
+ List<String> bootstrapCommand = new ArrayList<>();
+ bootstrapCommand.add(shellInterpreter());
+ bootstrapCommand.add(shellAbsolutePath().toString());
+ return bootstrapCommand;
+ }
+
+ protected abstract String shellHeader();
+
+ protected abstract String shellInterpreter();
+
+ protected abstract String shellExtension();
+
+ private List<String> systemEnvScript() {
+ if (CollectionUtils.isEmpty(systemEnvs)) {
+ return Collections.emptyList();
+ }
+ return systemEnvs.stream()
+ .map(systemEnv -> "call " + systemEnv)
+ .collect(Collectors.toList());
+ }
+
+ private List<String> customEnvScript() {
+ if (CollectionUtils.isEmpty(customEnvScripts)) {
+ return Collections.emptyList();
+ }
+ return customEnvScripts;
+ }
+
+ private Path shellAbsolutePath() {
+ return Paths.get(shellDirectory, shellName + shellExtension());
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptor.java
new file mode 100644
index 0000000000..45b9bf96f7
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell;
+
+import java.io.IOException;
+
+/**
+ * This interface is used to execute shell commands.
+ * It should be created by @{@link IShellInterceptorBuilder}.
+ */
+public interface IShellInterceptor {
+
+ Process execute() throws IOException;
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptorBuilder.java
new file mode 100644
index 0000000000..cab50e3bdf
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptorBuilder.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface IShellInterceptorBuilder<T extends IShellInterceptorBuilder<T, Y>, Y extends IShellInterceptor> {
+
+ T newBuilder();
+
+ T newBuilder(T builder);
+
+ T shellDirectory(String directory);
+
+ T shellName(String shellFilename);
+
+ T runUser(String systemUser);
+
+ T cpuQuota(Integer cpuQuota);
+
+ T memoryQuota(Integer memoryQuota);
+
+ T appendSystemEnv(String envFiles);
+
+ T appendCustomEnvScript(String customEnvScript);
+
+ T k8sConfigYaml(String k8sConfigYaml);
+
+ T properties(Map<String, String> propertyMap);
+
+ T sudoMode(boolean sudoEnable);
+
+ T appendScript(String script);
+
+ Y build() throws IOException;
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/ShellInterceptorBuilderFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/ShellInterceptorBuilderFactory.java
new file mode 100644
index 0000000000..9654d091cd
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/ShellInterceptorBuilderFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell;
+
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.cmd.CmdShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.sh.ShShellInterceptorBuilder;
+
+public class ShellInterceptorBuilderFactory {
+
+ private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type", "bash");
+
+ @SuppressWarnings("unchecked")
+ public static IShellInterceptorBuilder newBuilder() {
+ if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) {
+ return new BashShellInterceptorBuilder();
+ }
+ if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) {
+ return new ShShellInterceptorBuilder();
+ }
+ if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) {
+ return new CmdShellInterceptorBuilder();
+ }
+ throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE);
+ }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptor.java
new file mode 100644
index 0000000000..f0a237c742
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell.bash;
+
+import org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor;
+
+import java.util.List;
+
+public class BashShellInterceptor extends BaseShellInterceptor {
+
+ public BashShellInterceptor(List<String> executeCommands, String workingDirectory) {
+ super(executeCommands, workingDirectory);
+ }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptorBuilder.java
new file mode 100644
index 0000000000..15ffd2cc88
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptorBuilder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell.bash;
+
+import org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder;
+
+import java.io.IOException;
+import java.util.List;
+
+public class BashShellInterceptorBuilder
+ extends
+ BaseLinuxShellInterceptorBuilder<BashShellInterceptorBuilder, BashShellInterceptor> {
+
+ @Override
+ public BashShellInterceptorBuilder newBuilder() {
+ return new BashShellInterceptorBuilder();
+ }
+
+ @Override
+ public BashShellInterceptor build() throws IOException {
+ generateShellScript();
+ List<String> bootstrapCommand = generateBootstrapCommand();
+ return new BashShellInterceptor(bootstrapCommand, shellDirectory);
+ }
+
+ @Override
+ protected String shellInterpreter() {
+ return "bash";
+ }
+
+ @Override
+ protected String shellExtension() {
+ return ".sh";
+ }
+
+ @Override
+ protected String shellHeader() {
+ return "#!/bin/bash";
+ }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptor.java
new file mode 100644
index 0000000000..cb4d8b05d6
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell.cmd;
+
+import org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor;
+
+import java.util.List;
+
+public class CmdShellInterceptor extends BaseShellInterceptor {
+
+ protected CmdShellInterceptor(List<String> executeCommands, String workingDirectory) {
+ super(executeCommands, workingDirectory);
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptorBuilder.java
new file mode 100644
index 0000000000..61f7725fe0
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptorBuilder.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell.cmd;
+
+import org.apache.dolphinscheduler.plugin.task.api.shell.BaseWindowsShellInterceptorBuilder;
+
+import java.io.IOException;
+import java.util.List;
+
+public class CmdShellInterceptorBuilder
+ extends
+ BaseWindowsShellInterceptorBuilder<CmdShellInterceptorBuilder, CmdShellInterceptor> {
+
+ @Override
+ protected String shellHeader() {
+ return "@echo off";
+ }
+
+ @Override
+ protected String shellInterpreter() {
+ return "cmd.exe";
+ }
+
+ @Override
+ protected String shellExtension() {
+ return ".bat";
+ }
+
+ @Override
+ public CmdShellInterceptorBuilder newBuilder() {
+ return new CmdShellInterceptorBuilder();
+ }
+
+ @Override
+ public CmdShellInterceptor build() throws IOException {
+ generateShellScript();
+ List<String> bootstrapCommand = generateBootstrapCommand();
+ return new CmdShellInterceptor(bootstrapCommand, shellDirectory);
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptor.java
new file mode 100644
index 0000000000..fae2a8462c
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell.sh;
+
+import org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor;
+
+import java.util.List;
+
+public class ShShellInterceptor extends BaseShellInterceptor {
+
+ protected ShShellInterceptor(List<String> executeCommands, String shellDirectory) {
+ super(executeCommands, shellDirectory);
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptorBuilder.java
new file mode 100644
index 0000000000..cef1e5843a
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptorBuilder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.shell.sh;
+
+import org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder;
+
+import java.io.IOException;
+import java.util.List;
+
+public class ShShellInterceptorBuilder
+ extends
+ BaseLinuxShellInterceptorBuilder<ShShellInterceptorBuilder, ShShellInterceptor> {
+
+ @Override
+ public ShShellInterceptorBuilder newBuilder() {
+ return new ShShellInterceptorBuilder();
+ }
+
+ @Override
+ public ShShellInterceptorBuilder k8sConfigYaml(String k8sConfigYaml) {
+ return null;
+ }
+
+ @Override
+ public ShShellInterceptor build() throws IOException {
+ generateShellScript();
+ List<String> bootstrapCommand = generateBootstrapCommand();
+ return new ShShellInterceptor(bootstrapCommand, shellDirectory);
+ }
+
+ @Override
+ protected String shellHeader() {
+ return "#!/bin/sh";
+ }
+
+ @Override
+ protected String shellInterpreter() {
+ return "sh";
+ }
+
+ @Override
+ protected String shellExtension() {
+ return ".sh";
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
index 92bd3ea69e..2e4731d704 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.plugin.task.chunjun;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
@@ -30,25 +29,21 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.spi.enums.Flag;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.SystemUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.nio.file.attribute.FileAttribute;
-import java.nio.file.attribute.PosixFilePermission;
-import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
/**
* chunjun task
@@ -102,19 +97,16 @@ public class ChunJunTask extends AbstractTask {
}
}
- /**
- * run chunjun process
- *
- * @throws TaskException exception
- */
+ @SuppressWarnings("unchecked")
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
- String jsonFilePath = buildChunJunJsonFile(paramsMap);
- String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap);
- TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath, taskCallBack);
+ IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+ .properties(ParameterUtils.convert(paramsMap))
+ .appendScript(buildCommand(buildChunJunJsonFile(paramsMap)));
+ TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
@@ -167,25 +159,7 @@ public class ChunJunTask extends AbstractTask {
return fileName;
}
- /**
- * create command
- *
- * @return shell command file name
- * @throws Exception if error throws Exception
- */
- private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap) throws Exception {
- // generate scripts
- String fileName = String.format("%s/%s_node.%s",
- taskExecutionContext.getExecutePath(),
- taskExecutionContext.getTaskAppId(),
- SystemUtils.IS_OS_WINDOWS ? "bat" : "sh");
-
- Path path = new File(fileName).toPath();
-
- if (Files.exists(path)) {
- return fileName;
- }
-
+ private String buildCommand(String jobConfigFilePath) {
// chunjun command
List<String> args = new ArrayList<>();
@@ -215,24 +189,7 @@ public class ChunJunTask extends AbstractTask {
String command = String.join(" ", args);
- // replace placeholder
- String chunjunCommand = ParameterUtils.convertParameterPlaceholders(command, ParameterUtils.convert(paramsMap));
-
- log.info("raw script : {}", chunjunCommand);
-
- // create shell command file
- Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
- FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
-
- if (SystemUtils.IS_OS_WINDOWS) {
- Files.createFile(path);
- } else {
- Files.createFile(path, attr);
- }
-
- Files.write(path, chunjunCommand.getBytes(), StandardOpenOption.APPEND);
-
- return fileName;
+ return command;
}
public String getExecMode(ChunJunParameters chunJunParameters) {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
index 5fe451d916..381b8cbd37 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
@@ -38,7 +38,6 @@ import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.dataquality.DataQualityParameters;
@@ -57,6 +56,7 @@ import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* In DataQualityTask, the input parameters will be converted into DataQualityConfiguration,
@@ -160,19 +160,16 @@ public class DataQualityTask extends AbstractYarnTask {
}
@Override
- protected String buildCommand() {
+ protected String getScript() {
List<String> args = new ArrayList<>();
-
args.add(SPARK_COMMAND);
args.addAll(SparkArgsUtils.buildArgs(dataQualityParameters.getSparkParameters()));
+ return args.stream().collect(Collectors.joining(" "));
+ }
- // replace placeholder
- Map<String, Property> paramsMap = dqTaskExecutionContext.getPrepareParamsMap();
- String command =
- ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParameterUtils.convert(paramsMap));
- log.info("data quality task command: {}", command);
-
- return command;
+ @Override
+ protected Map<String, String> getProperties() {
+ return ParameterUtils.convert(dqTaskExecutionContext.getPrepareParamsMap());
}
protected void setMainJarName() {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
index 4477b42eb7..80de6cc7cd 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.datax;
import static org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
import org.apache.dolphinscheduler.common.log.SensitiveDataConverter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -33,6 +32,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -40,16 +41,11 @@ import org.apache.dolphinscheduler.spi.enums.Flag;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.SystemUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.nio.file.attribute.FileAttribute;
-import java.nio.file.attribute.PosixFilePermission;
-import java.nio.file.attribute.PosixFilePermissions;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -59,7 +55,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.ExecutionException;
import com.alibaba.druid.sql.ast.SQLStatement;
@@ -152,21 +147,18 @@ public class DataxTask extends AbstractTask {
dataXParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
}
- /**
- * run DataX process
- *
- * @throws TaskException if error throws Exception
- */
+ @SuppressWarnings("unchecked")
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
- // run datax processDataSourceService
- String jsonFilePath = buildDataxJsonFile(paramsMap);
- String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap);
- TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath, taskCallBack);
+ IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+ .properties(ParameterUtils.convert(paramsMap))
+ .appendScript(buildCommand(buildDataxJsonFile(paramsMap), paramsMap));
+
+ TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setProcessId(commandExecuteResult.getProcessId());
@@ -385,23 +377,10 @@ public class DataxTask extends AbstractTask {
* create command
*
* @return shell command file name
- * @throws Exception if error throws Exception
*/
- private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap) throws Exception {
- // generate scripts
- String fileName = String.format("%s/%s_node.%s",
- taskExecutionContext.getExecutePath(),
- taskExecutionContext.getTaskAppId(),
- SystemUtils.IS_OS_WINDOWS ? "bat" : "sh");
-
- Path path = new File(fileName).toPath();
-
- if (Files.exists(path)) {
- return fileName;
- }
-
+ protected String buildCommand(String jobConfigFilePath, Map<String, Property> paramsMap) {
// datax python command
- String sbr = DATAX_PYTHON +
+ return DATAX_PYTHON +
" " +
DATAX_PATH +
" " +
@@ -409,25 +388,6 @@ public class DataxTask extends AbstractTask {
addCustomParameters(paramsMap) +
" " +
jobConfigFilePath;
-
- // replace placeholder
- String dataxCommand = ParameterUtils.convertParameterPlaceholders(sbr, ParameterUtils.convert(paramsMap));
-
- log.debug("raw script : {}", dataxCommand);
-
- // create shell command file
- Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
- FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
-
- if (SystemUtils.IS_OS_WINDOWS) {
- Files.createFile(path);
- } else {
- Files.createFile(path, attr);
- }
-
- Files.write(path, dataxCommand.getBytes(), StandardOpenOption.APPEND);
-
- return fileName;
}
private StringBuilder addCustomParameters(Map<String, Property> paramsMap) {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java
index d5054e836d..e423fcb89d 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.datax;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@@ -40,8 +41,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceP
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
-import org.apache.commons.lang3.SystemUtils;
-
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -110,7 +109,7 @@ public class DataxTaskTest {
taskResponse.setStatus(TaskRunStatus.SUCCESS);
taskResponse.setExitStatusCode(0);
taskResponse.setProcessId(1);
- when(shellCommandExecutor.run(anyString(), eq(taskCallBack))).thenReturn(taskResponse);
+ when(shellCommandExecutor.run(any(), eq(taskCallBack))).thenReturn(taskResponse);
dataxTask.handle(taskCallBack);
Assertions.assertEquals(0, dataxTask.getExitStatusCode());
@@ -122,14 +121,8 @@ public class DataxTaskTest {
boolean delete = jsonFile.delete();
Assertions.assertTrue(delete);
- File shellCommandFile = SystemUtils.IS_OS_WINDOWS ? new File("/tmp/execution/app-id_node.bat")
- : new File("/tmp/execution/app-id_node.sh");
- InputStream shellCommandInputStream = Files.newInputStream(shellCommandFile.toPath());
- String shellCommandStr = FileUtils.readFile2Str(shellCommandInputStream);
- Assertions.assertEquals(shellCommandStr, "python2.7 ${DATAX_HOME}/bin/datax.py --jvm=\"-Xms1G -Xmx1G\" " +
- " /tmp/execution/app-id_job.json");
- delete = shellCommandFile.delete();
- Assertions.assertTrue(delete);
+ Assertions.assertEquals(dataxTask.buildCommand("/tmp/execution/app-id_job.json", null),
+ "python2.7 ${DATAX_HOME}/bin/datax.py --jvm=\"-Xms1G -Xmx1G\" /tmp/execution/app-id_job.json");
}
@Test
@@ -151,7 +144,7 @@ public class DataxTaskTest {
taskResponse.setStatus(TaskRunStatus.SUCCESS);
taskResponse.setExitStatusCode(0);
taskResponse.setProcessId(1);
- when(shellCommandExecutor.run(anyString(), eq(taskCallBack))).thenReturn(taskResponse);
+ when(shellCommandExecutor.run(any(), eq(taskCallBack))).thenReturn(taskResponse);
dataxTask.handle(taskCallBack);
Assertions.assertEquals(0, dataxTask.getExitStatusCode());
@@ -163,14 +156,8 @@ public class DataxTaskTest {
boolean delete = jsonFile.delete();
Assertions.assertTrue(delete);
- File shellCommandFile = SystemUtils.IS_OS_WINDOWS ? new File("/tmp/execution/app-id_node.bat")
- : new File("/tmp/execution/app-id_node.sh");
- InputStream shellCommandInputStream = Files.newInputStream(shellCommandFile.toPath());
- String shellCommandStr = FileUtils.readFile2Str(shellCommandInputStream);
- Assertions.assertEquals(shellCommandStr, "python2.7 ${DATAX_HOME}/bin/datax.py --jvm=\"-Xms1G -Xmx1G\" " +
- "-p \"-DDT='DT' -DDS='DS'\" /tmp/execution/app-id_job.json");
- delete = shellCommandFile.delete();
- Assertions.assertTrue(delete);
+ Assertions.assertEquals(dataxTask.buildCommand("/tmp/execution/app-id_job.json", createPrepareParamsMap()),
+ "python2.7 ${DATAX_HOME}/bin/datax.py --jvm=\"-Xms1G -Xmx1G\" -p \"-DDT='DT' -DDS='DS'\" /tmp/execution/app-id_job.json");
}
@Test
@@ -187,7 +174,7 @@ public class DataxTaskTest {
shellCommandExecutorFiled.setAccessible(true);
shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
- when(shellCommandExecutor.run(anyString(), eq(taskCallBack)))
+ when(shellCommandExecutor.run(any(), eq(taskCallBack)))
.thenThrow(new InterruptedException("Command execution failed"));
Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack));
}
@@ -206,7 +193,7 @@ public class DataxTaskTest {
shellCommandExecutorFiled.setAccessible(true);
shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
- when(shellCommandExecutor.run(anyString(), eq(taskCallBack)))
+ when(shellCommandExecutor.run(any(), eq(taskCallBack)))
.thenThrow(new IOException("Command execution failed"));
Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack));
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
index 37d2d2f8f9..032d953858 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
@@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import java.util.ArrayList;
import java.util.List;
@@ -78,8 +80,9 @@ public class DvcTask extends AbstractTask {
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// construct process
- String command = buildCommand();
- TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack);
+ IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+ .appendScript(buildCommand());
+ TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setProcessId(commandExecuteResult.getProcessId());
parameters.dealOutParam(shellCommandExecutor.getVarPool());
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
index bc53696077..3a71df62c7 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
@@ -23,12 +23,13 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.stream.StreamTask;
-import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.collections4.CollectionUtils;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
public class FlinkStreamTask extends FlinkTask implements StreamTask {
@@ -66,15 +67,15 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask {
* @return command
*/
@Override
- protected String buildCommand() {
+ protected String getScript() {
// flink run/run-application [OPTIONS] <jar-file> <arguments>
List<String> args = FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters);
+ return args.stream().collect(Collectors.joining(" "));
+ }
- String command = ParameterUtils
- .convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
-
- log.info("flink task command : {}", command);
- return command;
+ @Override
+ protected Map<String, String> getProperties() {
+ return taskExecutionContext.getDefinedParams();
}
@Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
index 33afd7fc7f..75764c3677 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
@@ -22,11 +22,12 @@ import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
-import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import java.util.List;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
public class FlinkTask extends AbstractYarnTask {
@@ -69,15 +70,15 @@ public class FlinkTask extends AbstractYarnTask {
* @return command
*/
@Override
- protected String buildCommand() {
+ protected String getScript() {
// flink run/run-application [OPTIONS] <jar-file> <arguments>
List<String> args = FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters);
+ return args.stream().collect(Collectors.joining(" "));
+ }
- String command = ParameterUtils
- .convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
-
- log.info("flink task command : {}", command);
- return command;
+ @Override
+ protected Map<String, String> getProperties() {
+ return taskExecutionContext.getDefinedParams();
}
@Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
index 8ba8409567..28443423c2 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
@@ -31,6 +31,8 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.io.FileUtils;
@@ -88,7 +90,9 @@ public class HiveCliTask extends AbstractRemoteTask {
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
- final TaskResponse taskResponse = shellCommandExecutor.run(buildCommand(), taskCallBack);
+ IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+ .appendScript(buildCommand());
+ final TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(taskResponse.getExitStatusCode());
setAppIds(taskResponse.getAppIds());
setProcessId(taskResponse.getProcessId());
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
index 9486ba5003..e9850b6459 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
@@ -32,6 +32,8 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.java.exception.JavaSourceFileExistException;
@@ -122,7 +124,9 @@ public class JavaTask extends AbstractTask {
throw new RunTypeNotFoundException("run type is required, but it is null now.");
}
Preconditions.checkNotNull(command, "command not be null.");
- TaskResponse taskResponse = shellCommandExecutor.run(command, taskCallBack);
+ IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+ .appendScript(command);
+ TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
log.info("java task run result: {}", taskResponse);
setExitStatusCode(taskResponse.getExitStatusCode());
setAppIds(taskResponse.getAppIds());
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
index e3058ee329..735b6237e2 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
@@ -26,9 +26,10 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.lang3.StringUtils;
@@ -38,6 +39,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -82,7 +84,11 @@ public class JupyterTask extends AbstractRemoteTask {
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
- TaskResponse response = shellCommandExecutor.run(buildCommand(), taskCallBack);
+ IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+ .properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
+ .appendScript(buildCommand());
+
+ TaskResponse response = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(response.getExitStatusCode());
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(response.getProcessId());
@@ -147,14 +153,7 @@ public class JupyterTask extends AbstractRemoteTask {
args.add(String.format(JupyterConstants.REMOVE_ENV, timestamp));
}
- // replace placeholder
- Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
- String command = ParameterUtils
- .convertParameterPlaceholders(String.join(" ", args), ParameterUtils.convert(paramsMap));
-
- log.info("jupyter task command: {}", command);
-
- return command;
+ return args.stream().collect(Collectors.joining(" "));
}
protected String readCondaPath() {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
index 132e6f6e56..c091ee6f64 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
@@ -26,9 +26,10 @@ import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.lang3.BooleanUtils;
@@ -37,7 +38,6 @@ import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -100,8 +100,10 @@ public class LinkisTask extends AbstractRemoteTask {
public void submitApplication() throws TaskException {
try {
// construct process
- String command = buildCommand();
- TaskResponse commandExecuteResult = shellCommandExecutor.run(command, null);
+ IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+ .properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
+ .appendScript(buildCommand());
+ TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, null);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(findTaskId(commandExecuteResult.getResultString()));
setProcessId(commandExecuteResult.getProcessId());
@@ -127,7 +129,9 @@ public class LinkisTask extends AbstractRemoteTask {
args.add(Constants.STATUS_OPTIONS);
args.add(taskId);
String command = String.join(Constants.SPACE, args);
- TaskResponse commandExecuteResult = shellCommandExecutor.run(command, null);
+ IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+ .appendScript(command);
+ TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, null);
String status = findStatus(commandExecuteResult.getResultString());
LinkisJobStatus jobStatus = LinkisJobStatus.convertFromJobStatusString(status);
switch (jobStatus) {
@@ -160,7 +164,10 @@ public class LinkisTask extends AbstractRemoteTask {
args.add(Constants.KILL_OPTIONS);
args.add(taskId);
String command = String.join(Constants.SPACE, args);
- shellCommandExecutor.run(command, null);
+
+ IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+ .appendScript(command);
+ shellCommandExecutor.run(shellActuatorBuilder, null);
setExitStatusCode(EXIT_CODE_KILL);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -176,7 +183,7 @@ public class LinkisTask extends AbstractRemoteTask {
List<String> args = new ArrayList<>();
args.addAll(buildOptions());
- String command = String.join(Constants.SPACE, args);
+ String command = String.join(" ", args);
log.info("Linkis task command: {}", command);
return command;
@@ -187,20 +194,13 @@ public class LinkisTask extends AbstractRemoteTask {
args.add(Constants.SHELL_CLI_OPTIONS);
args.add(Constants.ASYNC_OPTIONS);
if (BooleanUtils.isTrue(linkisParameters.getUseCustom())) {
- args.add(buildCustomConfigContent());
+ args.add(linkisParameters.getRawScript());
} else {
args.add(buildParamConfigContent());
}
return args;
}
- private String buildCustomConfigContent() {
- log.info("raw custom config content : {}", linkisParameters.getRawScript());
- String script = linkisParameters.getRawScript().replaceAll("\\r\\n", "\n");
- script = parseScript(script);
- return script;
- }
-
private String buildParamConfigContent() {
log.info("raw param config content : {}", linkisParameters.getParamScript());
String script = "";
@@ -210,7 +210,6 @@ public class LinkisTask extends AbstractRemoteTask {
.concat(Constants.SPACE)
.concat(param.getValue());
}
- script = parseScript(script);
return script;
}
@@ -248,8 +247,4 @@ public class LinkisTask extends AbstractRemoteTask {
return linkisParameters;
}
- private String parseScript(String script) {
- Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
- return ParameterUtils.convertParameterPlaceholders(script, ParameterUtils.convert(paramsMap));
- }
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
index e4fe074c05..3c1f73747c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
@@ -30,6 +30,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.lang3.StringUtils;
@@ -38,6 +40,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
* shell task
@@ -110,12 +113,15 @@ public class MlflowTask extends AbstractTask {
}
}
+ @SuppressWarnings("unchecked")
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// construct process
- String command = buildCommand();
- TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack);
+ IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+ .properties(ParameterUtils.convert(getParamsMap()))
+ .appendScript(buildCommand());
+ TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
int exitCode;
if (mlflowParameters.getIsDeployDocker()) {
exitCode = checkDockerHealth();
@@ -165,7 +171,6 @@ public class MlflowTask extends AbstractTask {
*/
private String buildCommandForMlflowProjects() {
- Map<String, Property> paramsMap = getParamsMap();
List<String> args = new ArrayList<>();
args.add(
String.format(MlflowConstants.EXPORT_MLFLOW_TRACKING_URI_ENV, mlflowParameters.getMlflowTrackingUri()));
@@ -219,8 +224,7 @@ public class MlflowTask extends AbstractTask {
runCommand = runCommand + " " + versionString;
}
args.add(runCommand);
-
- return ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParameterUtils.convert(paramsMap));
+ return args.stream().collect(Collectors.joining("\n"));
}
/**
@@ -228,7 +232,6 @@ public class MlflowTask extends AbstractTask {
*/
protected String buildCommandForMlflowModels() {
- Map<String, Property> paramsMap = getParamsMap();
List<String> args = new ArrayList<>();
args.add(
String.format(MlflowConstants.EXPORT_MLFLOW_TRACKING_URI_ENV, mlflowParameters.getMlflowTrackingUri()));
@@ -247,8 +250,7 @@ public class MlflowTask extends AbstractTask {
args.add(String.format(MlflowConstants.DOCKER_RUN, containerName, mlflowParameters.getDeployPort(),
imageName));
}
-
- return ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParameterUtils.convert(paramsMap));
+ return args.stream().collect(Collectors.joining("\n"));
}
private Map<String, Property> getParamsMap() {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
index 2ae5c6e707..dd51314dfa 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
@@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* mapreduce task
@@ -90,19 +91,19 @@ public class MapReduceTask extends AbstractYarnTask {
* @return command
*/
@Override
- protected String buildCommand() {
+ protected String getScript() {
// hadoop jar <jar> [mainClass] [GENERIC_OPTIONS] args...
List<String> args = new ArrayList<>();
args.add(MAPREDUCE_COMMAND);
// other parameters
args.addAll(MapReduceArgsUtils.buildArgs(mapreduceParameters, taskExecutionContext));
+ return args.stream().collect(Collectors.joining(" "));
+ }
- String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args),
- taskExecutionContext.getDefinedParams());
- log.info("mapreduce task command: {}", command);
-
- return command;
+ @Override
+ protected Map<String, String> getProperties() {
+ return taskExecutionContext.getDefinedParams();
}
@Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
index f60d5f1107..4b4d8cbd66 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
@@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.io.FileUtils;
@@ -86,6 +88,7 @@ public class PythonTask extends AbstractTask {
}
}
+ @SuppressWarnings("unchecked")
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
@@ -96,9 +99,11 @@ public class PythonTask extends AbstractTask {
// create this file
createPythonCommandFileIfNotExists(pythonScriptContent, pythonScriptFile);
- String command = buildPythonExecuteCommand(pythonScriptFile);
- TaskResponse taskResponse = shellCommandExecutor.run(command, taskCallBack);
+ IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+ .appendScript(buildPythonExecuteCommand(pythonScriptFile));
+
+ TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(taskResponse.getExitStatusCode());
setProcessId(taskResponse.getProcessId());
setVarPool(shellCommandExecutor.getVarPool());
@@ -165,9 +170,8 @@ public class PythonTask extends AbstractTask {
* build python script content
*
* @return raw python script
- * @throws Exception exception
*/
- protected String buildPythonScriptContent() throws Exception {
+ protected String buildPythonScriptContent() {
log.info("raw python script : {}", pythonParameters.getRawScript());
String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", System.lineSeparator());
Map<String, Property> paramsMap = mergeParamsWithContext(pythonParameters);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
index afe371628c..806750877c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
@@ -24,14 +24,15 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
+import java.util.stream.Collectors;
public class PytorchTask extends AbstractTask {
@@ -64,11 +65,15 @@ public class PytorchTask extends AbstractTask {
pythonEnvManager.setCondaPythonVersion(pytorchParameters.getCondaPythonVersion());
}
+ @SuppressWarnings("unchecked")
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
- String command = buildPythonExecuteCommand();
- TaskResponse taskResponse = shellCommandExecutor.run(command, taskCallBack);
+ IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+ .properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
+ .appendScript(buildPythonExecuteCommand());
+
+ TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(taskResponse.getExitStatusCode());
setProcessId(taskResponse.getProcessId());
setVarPool(shellCommandExecutor.getVarPool());
@@ -116,9 +121,7 @@ public class PytorchTask extends AbstractTask {
args.add(String.format("%s %s", getPythonCommand(), pytorchParameters.getScriptPath()));
}
-
- Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
- return ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParameterUtils.convert(paramsMap));
+ return args.stream().collect(Collectors.joining("\n"));
}
private String getPythonCommand() {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index bbf42ed8fa..0b837fa568 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -30,6 +30,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.io.FileUtils;
@@ -100,7 +102,10 @@ public class SeatunnelTask extends AbstractRemoteTask {
try {
// construct process
String command = buildCommand();
- TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack);
+ IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+ .appendScript(command);
+
+ TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
index 7d8fdd2e5a..c3c2bf62a1 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
@@ -19,26 +19,18 @@ package org.apache.dolphinscheduler.plugin.task.shell;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
+import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
-import org.apache.commons.lang3.SystemUtils;
-
-import java.io.File;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.Map;
-
/**
* shell task
*/
@@ -84,12 +76,15 @@ public class ShellTask extends AbstractTask {
}
}
+ @SuppressWarnings("unchecked")
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
- // construct process
- String command = buildCommand();
- TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack);
+ IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
+ .properties(ParameterUtils.convert(shellParameters.getLocalParametersMap()))
+ .appendScript(shellParameters.getRawScript());
+
+ TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setProcessId(commandExecuteResult.getProcessId());
shellParameters.dealOutParam(shellCommandExecutor.getVarPool());
@@ -115,47 +110,9 @@ public class ShellTask extends AbstractTask {
}
}
- /**
- * create command
- *
- * @return file name
- * @throws Exception exception
- */
- private String buildCommand() throws Exception {
- // generate scripts
- String fileName = String.format("%s/%s_node.%s",
- taskExecutionContext.getExecutePath(),
- taskExecutionContext.getTaskAppId(), SystemUtils.IS_OS_WINDOWS ? "bat" : "sh");
-
- File file = new File(fileName);
- Path path = file.toPath();
-
- if (Files.exists(path)) {
- // this shouldn't happen
- log.warn("The command file: {} is already exist", path);
- return fileName;
- }
-
- String script = shellParameters.getRawScript().replaceAll("\\r\\n", System.lineSeparator());
- script = parseScript(script);
- shellParameters.setRawScript(script);
-
- log.info("raw script : {}", shellParameters.getRawScript());
- log.info("task execute path : {}", taskExecutionContext.getExecutePath());
-
- FileUtils.createFileWith755(path);
- Files.write(path, shellParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
-
- return fileName;
- }
-
@Override
public AbstractParameters getParameters() {
return shellParameters;
}
- private String parseScript(String script) {
- Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
- return ParameterUtils.convertParameterPlaceholders(script, ParameterUtils.convert(paramsMap));
- }
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
index 675320d49f..ef0b71c8fe 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
@@ -51,6 +51,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import io.fabric8.kubernetes.client.Config;
@@ -88,13 +89,8 @@ public class SparkTask extends AbstractYarnTask {
log.info("Initialize spark task params {}", JSONUtils.toPrettyJsonString(sparkParameters));
}
- /**
- * create command
- *
- * @return command
- */
@Override
- protected String buildCommand() {
+ protected String getScript() {
/**
* (1) spark-submit [options] <app jar | python file> [app arguments]
* (2) spark-sql [options] -f <filename>
@@ -116,14 +112,12 @@ public class SparkTask extends AbstractYarnTask {
args.addAll(populateSparkOptions());
// replace placeholder
- Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
-
- String command =
- ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParameterUtils.convert(paramsMap));
-
- log.info("spark task command: {}", command);
+ return args.stream().collect(Collectors.joining(" "));
+ }
- return command;
+ @Override
+ protected Map<String, String> getProperties() {
+ return ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap());
}
/**
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
index 14ccbcd56f..ba031bb304 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
@@ -54,7 +54,7 @@ public class SparkTaskTest {
"--conf spark.executor.memory=1G " +
"--name sparksql " +
"-f /tmp/5536_node.sql",
- sparkTask.buildCommand());
+ sparkTask.getScript());
}
@Test
@@ -79,7 +79,7 @@ public class SparkTaskTest {
"--conf spark.executor.memory=1G " +
"--name spark " +
"/lib/dolphinscheduler-task-spark.jar",
- sparkTask.buildCommand());
+ sparkTask.getScript());
}
private String buildSparkParametersWithSparkSql() {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
index b8d35e4fdf..016a07a56b 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
@@ -22,7 +22,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.sqoop.generator.SqoopJobGenerator;
@@ -72,17 +71,16 @@ public class SqoopTask extends AbstractYarnTask {
}
@Override
- protected String buildCommand() {
+ protected String getScript() {
// get sqoop scripts
SqoopJobGenerator generator = new SqoopJobGenerator();
- String script = generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext);
+ return generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext);
- Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
-
- String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParameterUtils.convert(paramsMap));
- log.info("sqoop script: {}", resultScripts);
- return resultScripts;
+ }
+ @Override
+ protected Map<String, String> getProperties() {
+ return ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap());
}
@Override