You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2020/12/07 06:10:34 UTC
[incubator-dolphinscheduler] branch 1.3.4-prepare updated:
[FIX-3900][server] Cherry pick from dev to kill multi yarn app in one job
(#4151)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 1.3.4-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/1.3.4-prepare by this push:
new d796a63 [FIX-3900][server] Cherry pick from dev to kill multi yarn app in one job (#4151)
d796a63 is described below
commit d796a63b123274fb4f6af74d1593b779fd430c96
Author: lgcareer <18...@163.com>
AuthorDate: Mon Dec 7 14:10:27 2020 +0800
[FIX-3900][server] Cherry pick from dev to kill multi yarn app in one job (#4151)
* [FIX-3900][server] Cherry pick from dev to kill multi yarn app in one job
* [FIX-3900][server] Cherry pick from dev to kill multi yarn app in one job
Co-authored-by: Eights-LI <ye...@gmail.com>
---
.../apache/dolphinscheduler/common/Constants.java | 10 +
.../server/utils/ProcessUtils.java | 714 +++++++++++----------
.../server/utils/ProcessUtilsTest.java | 111 +++-
3 files changed, 489 insertions(+), 346 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 56f3112..a35bc18 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -970,4 +970,14 @@ public final class Constants {
public static final int ABNORMAL_NODE_STATUS = 1;
+ /**
+ * exec shell scripts
+ */
+ public static final String SH = "sh";
+
+ /**
+ * pstree, get pud and sub pid
+ */
+ public static final String PSTREE = "pstree";
+
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index 5074a5e..270f43e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -16,392 +16,436 @@
*/
package org.apache.dolphinscheduler.server.utils;
-import java.nio.charset.StandardCharsets;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.log.LogClientService;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
/**
- * mainly used to get the start command line of a process
+ * mainly used to get the start command line of a process.
*/
public class ProcessUtils {
- /**
- * logger
- */
- private final static Logger logger = LoggerFactory.getLogger(ProcessUtils.class);
-
- /**
- * build command line characters
- * @param commandList command list
- * @return command
- * @throws IOException io exception
- */
- public static String buildCommandStr(List<String> commandList) throws IOException {
- String cmdstr;
- String[] cmd = commandList.toArray(new String[commandList.size()]);
- SecurityManager security = System.getSecurityManager();
- boolean allowAmbiguousCommands = false;
- if (security == null) {
- allowAmbiguousCommands = true;
- String value = System.getProperty("jdk.lang.Process.allowAmbiguousCommands");
- if (value != null) {
- allowAmbiguousCommands = !"false".equalsIgnoreCase(value);
- }
+ /**
+ * logger
+ */
+ private static final Logger logger = LoggerFactory.getLogger(ProcessUtils.class);
+
+ /**
+ * Initialization regularization, solve the problem of pre-compilation performance,
+ * avoid the thread safety problem of multi-thread operation.
+ */
+ private static final Pattern MACPATTERN = Pattern.compile("-[+|-]-\\s(\\d+)");
+
+ private static final Pattern LINUXPATTERN = Pattern.compile("(\\d+)");
+
+ private static final String LOCAL_PROCESS_EXEC = "jdk.lang.Process.allowAmbiguousCommands";
+
+ /**
+ * build command line characters.
+ *
+ * @param commandList command list
+ * @return command
+ * @throws IOException io exception
+ */
+ public static String buildCommandStr(List<String> commandList) throws IOException {
+ String cmdstr;
+ String[] cmd = commandList.toArray(new String[0]);
+ SecurityManager security = System.getSecurityManager();
+ boolean allowAmbiguousCommands = isAllowAmbiguousCommands(security);
+ if (allowAmbiguousCommands) {
+
+ String executablePath = new File(cmd[0]).getPath();
+
+ if (needsEscaping(VERIFICATION_LEGACY, executablePath)) {
+ executablePath = quoteString(executablePath);
+ }
+
+ cmdstr = createCommandLine(
+ VERIFICATION_LEGACY, executablePath, cmd);
+ } else {
+ String executablePath;
+ try {
+ executablePath = getExecutablePath(cmd[0]);
+ } catch (IllegalArgumentException e) {
+
+ StringBuilder join = new StringBuilder();
+ for (String s : cmd) {
+ join.append(s).append(' ');
+ }
+
+ cmd = getTokensFromCommand(join.toString());
+ executablePath = getExecutablePath(cmd[0]);
+
+ // Check new executable name once more
+ if (security != null) {
+ security.checkExec(executablePath);
+ }
+ }
+
+ cmdstr = createCommandLine(
+
+ isShellFile(executablePath) ? VERIFICATION_CMD_BAT : VERIFICATION_WIN32, quoteString(executablePath), cmd);
+ }
+ return cmdstr;
}
- if (allowAmbiguousCommands) {
- String executablePath = new File(cmd[0]).getPath();
-
- if (needsEscaping(VERIFICATION_LEGACY, executablePath)) {
- executablePath = quoteString(executablePath);
- }
-
- cmdstr = createCommandLine(
- VERIFICATION_LEGACY, executablePath, cmd);
- } else {
- String executablePath;
- try {
- executablePath = getExecutablePath(cmd[0]);
- } catch (IllegalArgumentException e) {
-
- StringBuilder join = new StringBuilder();
- for (String s : cmd) {
- join.append(s).append(' ');
+ /**
+ * check is allow ambiguous commands
+ *
+ * @param security security manager
+ * @return allow ambiguous command flag
+ */
+ private static boolean isAllowAmbiguousCommands(SecurityManager security) {
+ boolean allowAmbiguousCommands = false;
+ if (security == null) {
+ allowAmbiguousCommands = true;
+ String value = System.getProperty(LOCAL_PROCESS_EXEC);
+ if (value != null) {
+ allowAmbiguousCommands = !Constants.STRING_FALSE.equalsIgnoreCase(value);
+ }
}
+ return allowAmbiguousCommands;
+ }
- cmd = getTokensFromCommand(join.toString());
- executablePath = getExecutablePath(cmd[0]);
-
- // Check new executable name once more
- if (security != null) {
- security.checkExec(executablePath);
- }
- }
+ /**
+ * get executable path.
+ *
+ * @param path path
+ * @return executable path
+ * @throws IOException io exception
+ */
+ private static String getExecutablePath(String path) throws IOException {
+ boolean pathIsQuoted = isQuoted(true, path, "Executable name has embedded quote, split the arguments");
+
+ File fileToRun = new File(pathIsQuoted ? path.substring(1, path.length() - 1) : path);
+ return fileToRun.getPath();
+ }
+ /**
+ * whether is shell file.
+ *
+ * @param executablePath executable path
+ * @return true if endsWith .CMD or .BAT
+ */
+ private static boolean isShellFile(String executablePath) {
+ String upPath = executablePath.toUpperCase();
+ return (upPath.endsWith(".CMD") || upPath.endsWith(".BAT"));
+ }
- cmdstr = createCommandLine(
+ /**
+ * quote string
+ *
+ * @param arg argument
+ * @return format arg
+ */
+ private static String quoteString(String arg) {
+ StringBuilder argbuf = new StringBuilder(arg.length() + 2);
+ return argbuf.append('"').append(arg).append('"').toString();
+ }
- isShellFile(executablePath) ? VERIFICATION_CMD_BAT : VERIFICATION_WIN32, quoteString(executablePath), cmd);
+ /**
+ * get tokens from command.
+ *
+ * @param command command
+ * @return token string array
+ */
+ private static String[] getTokensFromCommand(String command) {
+ ArrayList<String> matchList = new ArrayList<>(8);
+ Matcher regexMatcher = LazyPattern.PATTERN.matcher(command);
+ while (regexMatcher.find()) {
+ matchList.add(regexMatcher.group());
+ }
+ return matchList.toArray(new String[0]);
}
- return cmdstr;
- }
-
- /**
- * get executable path
- *
- * @param path path
- * @return executable path
- * @throws IOException io exception
- */
- private static String getExecutablePath(String path) throws IOException {
- boolean pathIsQuoted = isQuoted(true, path, "Executable name has embedded quote, split the arguments");
-
- File fileToRun = new File(pathIsQuoted ? path.substring(1, path.length() - 1) : path);
- return fileToRun.getPath();
- }
-
- /**
- * whether is shell file
- *
- * @param executablePath executable path
- * @return true if endsWith .CMD or .BAT
- */
- private static boolean isShellFile(String executablePath) {
- String upPath = executablePath.toUpperCase();
- return (upPath.endsWith(".CMD") || upPath.endsWith(".BAT"));
- }
-
- /**
- * quote string
- *
- * @param arg argument
- * @return format arg
- */
- private static String quoteString(String arg) {
- StringBuilder argbuf = new StringBuilder(arg.length() + 2);
- return argbuf.append('"').append(arg).append('"').toString();
- }
-
- /**
- * get tokens from command
- *
- * @param command command
- * @return token string array
- */
- private static String[] getTokensFromCommand(String command) {
- ArrayList<String> matchList = new ArrayList<>(8);
- Matcher regexMatcher = LazyPattern.PATTERN.matcher(command);
- while (regexMatcher.find()) {
- matchList.add(regexMatcher.group());
+
+ /**
+ * Lazy Pattern
+ */
+ private static class LazyPattern {
+ /**
+ * Escape-support version:
+ * "(\")((?:\\\\\\1|.)+?)\\1|([^\\s\"]+)";
+ */
+ private static final Pattern PATTERN = Pattern.compile("[^\\s\"]+|\"[^\"]*\"");
}
- return matchList.toArray(new String[matchList.size()]);
- }
-
- /**
- * Lazy Pattern
- */
- private static class LazyPattern {
- // Escape-support version:
- // "(\")((?:\\\\\\1|.)+?)\\1|([^\\s\"]+)";
- private static final Pattern PATTERN = Pattern.compile("[^\\s\"]+|\"[^\"]*\"");
- }
-
- /**
- * verification cmd bat
- */
- private static final int VERIFICATION_CMD_BAT = 0;
-
- /**
- * verification win32
- */
- private static final int VERIFICATION_WIN32 = 1;
-
- /**
- * verification legacy
- */
- private static final int VERIFICATION_LEGACY = 2;
-
- /**
- * escape verification
- */
- private static final char[][] ESCAPE_VERIFICATION = {{' ', '\t', '<', '>', '&', '|', '^'},
-
- {' ', '\t', '<', '>'}, {' ', '\t'}};
-
- /**
- * matcher
- */
- private static Matcher matcher;
-
- /**
- * create command line
- * @param verificationType verification type
- * @param executablePath executable path
- * @param cmd cmd
- * @return command line
- */
- private static String createCommandLine(int verificationType, final String executablePath, final String[] cmd) {
- StringBuilder cmdbuf = new StringBuilder(80);
-
- cmdbuf.append(executablePath);
-
- for (int i = 1; i < cmd.length; ++i) {
- cmdbuf.append(' ');
- String s = cmd[i];
- if (needsEscaping(verificationType, s)) {
- cmdbuf.append('"').append(s);
-
- if ((verificationType != VERIFICATION_CMD_BAT) && s.endsWith("\\")) {
- cmdbuf.append('\\');
+
+ /**
+ * verification cmd bat
+ */
+ private static final int VERIFICATION_CMD_BAT = 0;
+
+ /**
+ * verification win32
+ */
+ private static final int VERIFICATION_WIN32 = 1;
+
+ /**
+ * verification legacy
+ */
+ private static final int VERIFICATION_LEGACY = 2;
+
+ /**
+ * escape verification
+ */
+ private static final char[][] ESCAPE_VERIFICATION = {{' ', '\t', '<', '>', '&', '|', '^'},
+
+ {' ', '\t', '<', '>'}, {' ', '\t'}};
+
+ /**
+ * create command line
+ *
+ * @param verificationType verification type
+ * @param executablePath executable path
+ * @param cmd cmd
+ * @return command line
+ */
+ private static String createCommandLine(int verificationType, final String executablePath, final String[] cmd) {
+ StringBuilder cmdbuf = new StringBuilder(80);
+
+ cmdbuf.append(executablePath);
+
+ for (int i = 1; i < cmd.length; ++i) {
+ cmdbuf.append(' ');
+ String s = cmd[i];
+ if (needsEscaping(verificationType, s)) {
+ cmdbuf.append('"').append(s);
+
+ if ((verificationType != VERIFICATION_CMD_BAT) && s.endsWith("\\")) {
+ cmdbuf.append('\\');
+ }
+ cmdbuf.append('"');
+ } else {
+ cmdbuf.append(s);
+ }
}
- cmdbuf.append('"');
- } else {
- cmdbuf.append(s);
- }
+ return cmdbuf.toString();
}
- return cmdbuf.toString();
- }
-
- /**
- * whether is quoted
- * @param noQuotesInside
- * @param arg
- * @param errorMessage
- * @return boolean
- */
- private static boolean isQuoted(boolean noQuotesInside, String arg, String errorMessage) {
- int lastPos = arg.length() - 1;
- if (lastPos >= 1 && arg.charAt(0) == '"' && arg.charAt(lastPos) == '"') {
- // The argument has already been quoted.
- if (noQuotesInside) {
- if (arg.indexOf('"', 1) != lastPos) {
- // There is ["] inside.
- throw new IllegalArgumentException(errorMessage);
+
+ /**
+ * whether is quoted
+ *
+ * @param noQuotesInside no quotes inside
+ * @param arg arg
+ * @param errorMessage error message
+ * @return boolean
+ */
+ private static boolean isQuoted(boolean noQuotesInside, String arg, String errorMessage) {
+ int lastPos = arg.length() - 1;
+ if (lastPos >= 1 && arg.charAt(0) == '"' && arg.charAt(lastPos) == '"') {
+ // The argument has already been quoted.
+ if (noQuotesInside && arg.indexOf('"', 1) != lastPos) {
+ // There is ["] inside.
+ throw new IllegalArgumentException(errorMessage);
+ }
+ return true;
}
- }
- return true;
- }
- if (noQuotesInside) {
- if (arg.indexOf('"') >= 0) {
- // There is ["] inside.
- throw new IllegalArgumentException(errorMessage);
- }
- }
- return false;
- }
-
- /**
- * whether needs escaping
- *
- * @param verificationType verification type
- * @param arg arg
- * @return boolean
- */
- private static boolean needsEscaping(int verificationType, String arg) {
-
- boolean argIsQuoted = isQuoted((verificationType == VERIFICATION_CMD_BAT), arg, "Argument has embedded quote, use the explicit CMD.EXE call.");
-
- if (!argIsQuoted) {
- char[] testEscape = ESCAPE_VERIFICATION[verificationType];
- for (int i = 0; i < testEscape.length; ++i) {
- if (arg.indexOf(testEscape[i]) >= 0) {
- return true;
+ if (noQuotesInside && arg.indexOf('"') >= 0) {
+ // There is ["] inside.
+ throw new IllegalArgumentException(errorMessage);
}
- }
+ return false;
}
- return false;
- }
-
- /**
- * kill yarn application
- *
- * @param appIds app id list
- * @param logger logger
- * @param tenantCode tenant code
- * @param executePath execute path
- * @throws IOException io exception
- */
- public static void cancelApplication(List<String> appIds, Logger logger, String tenantCode,String executePath)
- throws IOException {
- if (appIds.size() > 0) {
- String appid = appIds.get(appIds.size() - 1);
- String commandFile = String
- .format("%s/%s.kill", executePath, appid);
- String cmd = "yarn application -kill " + appid;
- try {
- StringBuilder sb = new StringBuilder();
- sb.append("#!/bin/sh\n");
- sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
- sb.append("cd $BASEDIR\n");
- if (CommonUtils.getSystemEnvPath() != null) {
- sb.append("source " + CommonUtils.getSystemEnvPath() + "\n");
- }
- sb.append("\n\n");
- sb.append(cmd);
-
- File f = new File(commandFile);
- if (!f.exists()) {
- FileUtils.writeStringToFile(new File(commandFile), sb.toString(), StandardCharsets.UTF_8);
+ /**
+ * whether needs escaping
+ *
+ * @param verificationType verification type
+ * @param arg arg
+ * @return boolean
+ */
+ private static boolean needsEscaping(int verificationType, String arg) {
+
+ boolean argIsQuoted = isQuoted((verificationType == VERIFICATION_CMD_BAT), arg, "Argument has embedded quote, use the explicit CMD.EXE call.");
+
+ if (!argIsQuoted) {
+ char[] testEscape = ESCAPE_VERIFICATION[verificationType];
+ for (char c : testEscape) {
+ if (arg.indexOf(c) >= 0) {
+ return true;
+ }
+ }
}
+ return false;
+ }
- String runCmd = "sh " + commandFile;
- if (StringUtils.isNotEmpty(tenantCode)) {
- runCmd = "sudo -u " + tenantCode + " " + runCmd;
+ /**
+ * kill yarn application
+ *
+ * @param appIds app id list
+ * @param logger logger
+ * @param tenantCode tenant code
+ * @param executePath execute path
+ */
+ public static void cancelApplication(List<String> appIds, Logger logger, String tenantCode, String executePath) {
+ if (CollectionUtils.isNotEmpty(appIds)) {
+
+ for (String appId : appIds) {
+ try {
+ ExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId);
+
+ if (!applicationStatus.typeIsFinished()) {
+ String commandFile = String
+ .format("%s/%s.kill", executePath, appId);
+ String cmd = "yarn application -kill " + appId;
+ execYarnKillCommand(logger, tenantCode, appId, commandFile, cmd);
+ }
+ } catch (Exception e) {
+ logger.error(String.format("Get yarn application app id [%s] status failed: [%s]", appId, e.getMessage()));
+ }
+ }
}
+ }
- logger.info("kill cmd:{}", runCmd);
-
- Runtime.getRuntime().exec(runCmd);
- } catch (Exception e) {
- logger.error("kill application error", e);
- }
+ /**
+ * build kill command for yarn application
+ *
+ * @param logger logger
+ * @param tenantCode tenant code
+ * @param appId app id
+ * @param commandFile command file
+ * @param cmd cmd
+ */
+ private static void execYarnKillCommand(Logger logger, String tenantCode, String appId, String commandFile, String cmd) {
+ try {
+ StringBuilder sb = new StringBuilder();
+ sb.append("#!/bin/sh\n");
+ sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
+ sb.append("cd $BASEDIR\n");
+ if (CommonUtils.getSystemEnvPath() != null) {
+ sb.append("source ").append(CommonUtils.getSystemEnvPath()).append("\n");
+ }
+ sb.append("\n\n");
+ sb.append(cmd);
+
+ File f = new File(commandFile);
+
+ if (!f.exists()) {
+ FileUtils.writeStringToFile(new File(commandFile), sb.toString(), StandardCharsets.UTF_8);
+ }
+
+ String runCmd = String.format("%s %s", Constants.SH, commandFile);
+ if (StringUtils.isNotEmpty(tenantCode)) {
+ runCmd = "sudo -u " + tenantCode + " " + runCmd;
+ }
+
+ logger.info("kill cmd:{}", runCmd);
+ OSUtils.exeCmd(runCmd);
+ } catch (Exception e) {
+ logger.error(String.format("Kill yarn application app id [%s] failed: [%s]", appId, e.getMessage()));
+ }
}
- }
- /**
- * kill tasks according to different task types
- *
- * @param taskExecutionContext taskExecutionContext
- */
- public static void kill(TaskExecutionContext taskExecutionContext) {
- try {
- int processId = taskExecutionContext.getProcessId();
- if(processId == 0 ){
- logger.error("process kill failed, process id :{}, task id:{}",
- processId, taskExecutionContext.getTaskInstanceId());
- return ;
- }
+ /**
+ * kill tasks according to different task types
+ *
+ * @param taskExecutionContext taskExecutionContext
+ */
+ public static void kill(TaskExecutionContext taskExecutionContext) {
+ try {
+ int processId = taskExecutionContext.getProcessId();
+ if (processId == 0) {
+ logger.error("process kill failed, process id :{}, task id:{}",
+ processId, taskExecutionContext.getTaskInstanceId());
+ return;
+ }
- String cmd = String.format("sudo kill -9 %s", getPidsStr(processId));
+ String cmd = String.format("sudo kill -9 %s", getPidsStr(processId));
- logger.info("process id:{}, cmd:{}", processId, cmd);
+ logger.info("process id:{}, cmd:{}", processId, cmd);
- OSUtils.exeCmd(cmd);
+ OSUtils.exeCmd(cmd);
- // find log and kill yarn job
- killYarnJob(taskExecutionContext);
+ // find log and kill yarn job
+ killYarnJob(taskExecutionContext);
- } catch (Exception e) {
- logger.error("kill task failed", e);
- }
- }
-
- /**
- * get pids str
- *
- * @param processId process id
- * @return pids
- * @throws Exception exception
- */
- public static String getPidsStr(int processId)throws Exception{
- StringBuilder sb = new StringBuilder();
- Matcher mat;
- // pstree pid get sub pids
- if (OSUtils.isMacOS()) {
- String pids = OSUtils.exeCmd("pstree -sp " + processId);
- mat = Pattern.compile("-[+|-]-\\s(\\d+)").matcher(pids);
- } else {
- String pids = OSUtils.exeCmd("pstree -p " + processId);
- mat = Pattern.compile("(\\d+)").matcher(pids);
+ } catch (Exception e) {
+ logger.error("kill task failed", e);
+ }
}
- while (mat.find()){
- sb.append(mat.group(1)).append(" ");
- }
- return sb.toString().trim();
- }
-
- /**
- * find logs and kill yarn tasks
- *
- * @param taskExecutionContext taskExecutionContext
- */
- public static void killYarnJob(TaskExecutionContext taskExecutionContext) {
- try {
- Thread.sleep(Constants.SLEEP_TIME_MILLIS);
- LogClientService logClient = null;
- String log = null;
- try {
- logClient = new LogClientService();
- log = logClient.viewLog(Host.of(taskExecutionContext.getHost()).getIp(),
- Constants.RPC_PORT,
- taskExecutionContext.getLogPath());
- } finally {
- if(logClient != null){
- logClient.close();
- }
- }
- if (StringUtils.isNotEmpty(log)) {
- List<String> appIds = LoggerUtils.getAppIds(log, logger);
- String workerDir = taskExecutionContext.getExecutePath();
- if (StringUtils.isEmpty(workerDir)) {
- logger.error("task instance work dir is empty");
- throw new RuntimeException("task instance work dir is empty");
+ /**
+ * get pids str
+ *
+ * @param processId process id
+ * @return pids pid String
+ * @throws Exception exception
+ */
+ public static String getPidsStr(int processId) throws Exception {
+ StringBuilder sb = new StringBuilder();
+ Matcher mat = null;
+ // pstree pid get sub pids
+ if (OSUtils.isMacOS()) {
+ String pids = OSUtils.exeCmd(String.format("%s -sp %d", Constants.PSTREE, processId));
+ if (null != pids) {
+ mat = MACPATTERN.matcher(pids);
+ }
+ } else {
+ String pids = OSUtils.exeCmd(String.format("%s -p %d", Constants.PSTREE, processId));
+ mat = LINUXPATTERN.matcher(pids);
}
- if (appIds.size() > 0) {
- cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
+
+ if (null != mat) {
+ while (mat.find()) {
+ sb.append(mat.group(1)).append(" ");
+ }
}
- }
- } catch (Exception e) {
- logger.error("kill yarn job failure",e);
+ return sb.toString().trim();
+ }
+
+ /**
+ * find logs and kill yarn tasks
+ *
+ * @param taskExecutionContext taskExecutionContext
+ */
+ public static void killYarnJob(TaskExecutionContext taskExecutionContext) {
+ try {
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+ LogClientService logClient = null;
+ String log;
+ try {
+ logClient = new LogClientService();
+ log = logClient.viewLog(Host.of(taskExecutionContext.getHost()).getIp(),
+ Constants.RPC_PORT,
+ taskExecutionContext.getLogPath());
+ } finally {
+ if (logClient != null) {
+ logClient.close();
+ }
+ }
+ if (StringUtils.isNotEmpty(log)) {
+ List<String> appIds = LoggerUtils.getAppIds(log, logger);
+ String workerDir = taskExecutionContext.getExecutePath();
+ if (StringUtils.isEmpty(workerDir)) {
+ logger.error("task instance work dir is empty");
+ throw new RuntimeException("task instance work dir is empty");
+ }
+ if (CollectionUtils.isNotEmpty(appIds)) {
+ cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
+ }
+ }
+
+ } catch (Exception e) {
+ logger.error("kill yarn job failure", e);
+ }
}
- }
}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java
index 1e0adaa..5f8a080 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java
@@ -16,35 +16,124 @@
*/
package org.apache.dolphinscheduler.server.utils;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.HadoopUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.MockitoAnnotations;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({System.class, OSUtils.class, HadoopUtils.class})
public class ProcessUtilsTest {
- private static final Logger logger = LoggerFactory.getLogger(ProcessUtilsTest.class);
+ private static final Logger logger = LoggerFactory.getLogger(ProcessUtils.class);
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
@Test
public void getPidsStr() throws Exception {
- String pidList = ProcessUtils.getPidsStr(1);
+ int processId = 1;
+ String pidList = ProcessUtils.getPidsStr(processId);
Assert.assertNotEquals("The child process of process 1 should not be empty", pidList, "");
- logger.info("Sub process list : {}", pidList);
+
+ PowerMockito.mockStatic(OSUtils.class);
+ when(OSUtils.isMacOS()).thenReturn(true);
+ when(OSUtils.exeCmd(String.format("%s -p %d", Constants.PSTREE, processId))).thenReturn(null);
+ String pidListMac = ProcessUtils.getPidsStr(processId);
+ Assert.assertEquals("", pidListMac);
}
@Test
- public void testBuildCommandStr() {
+ public void testBuildCommandStr() throws IOException {
List<String> commands = new ArrayList<>();
commands.add("sudo");
+ commands.add("-u");
+ commands.add("tenantCode");
+ //allowAmbiguousCommands false
+ Assert.assertEquals("sudo -u tenantCode", ProcessUtils.buildCommandStr(commands));
+
+ //quota
+ commands.clear();
+ commands.add("\"sudo\"");
+ Assert.assertEquals("\"sudo\"", ProcessUtils.buildCommandStr(commands));
+
+ //allowAmbiguousCommands true
+ commands.clear();
+ commands.add("sudo");
+ System.setProperty("jdk.lang.Process.allowAmbiguousCommands", "false");
+ Assert.assertEquals("\"sudo\"", ProcessUtils.buildCommandStr(commands));
+ }
+
+ @Test
+ public void testKill() {
+ //get taskExecutionContext
+ TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+
+ //process id eq 0
+ taskExecutionContext.setProcessId(0);
+ ProcessUtils.kill(taskExecutionContext);
+
+ //process id not eq 0
+ taskExecutionContext.setProcessId(1);
+ PowerMockito.mockStatic(OSUtils.class);
try {
- Assert.assertEquals(ProcessUtils.buildCommandStr(commands), "sudo");
- } catch (IOException e) {
- Assert.fail(e.getMessage());
+ when(OSUtils.exeCmd(String.format("%s -sp %d", Constants.PSTREE, 1))).thenReturn("1111");
+ when(OSUtils.exeCmd(String.format("%s -p %d", Constants.PSTREE, 1))).thenReturn("1111");
+ when(OSUtils.exeCmd("sudo kill -9")).thenReturn("1111");
+ } catch (Exception e) {
+ e.printStackTrace();
}
+ taskExecutionContext.setHost("127.0.0.1:8888");
+ taskExecutionContext.setLogPath("/log/1.log");
+ ProcessUtils.kill(taskExecutionContext);
+ Assert.assertEquals(1, taskExecutionContext.getProcessId());
}
+ @Test
+ public void testCancelApplication() {
+ List<String> appIds = new ArrayList<>();
+ appIds.add("application_1585532379175_228491");
+ appIds.add("application_1598885606600_3677");
+ String tenantCode = "dev";
+ String executePath = "/ds-exec/1/1/1";
+ ExecutionStatus running = ExecutionStatus.RUNNING_EXEUTION;
+
+ PowerMockito.mockStatic(HadoopUtils.class);
+ HadoopUtils hadoop = HadoopUtils.getInstance();
+
+ try {
+ PowerMockito.whenNew(HadoopUtils.class).withAnyArguments().thenReturn(hadoop);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ try {
+ when(hadoop.getApplicationStatus("application_1585532379175_228491")).thenReturn(running);
+ when(hadoop.getApplicationStatus("application_1598885606600_3677")).thenReturn(running);
+ } catch (Exception e) {
+ e.printStackTrace();
+ ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath);
+ }
+
+ Assert.assertNotNull(appIds);
+ }
}