You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2009/12/04 06:06:36 UTC
svn commit: r887061 - in /hadoop/mapreduce/trunk: ./
src/c++/task-controller/ src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/mapreduce/util/
src/test/mapred/org/apache/hadoop/mapred/
Author: tomwhite
Date: Fri Dec 4 05:06:34 2009
New Revision: 887061
URL: http://svn.apache.org/viewvc?rev=887061&view=rev
Log:
MAPREDUCE-1119. When tasks fail to report status, show tasks's stack dump before killing. Contributed by Aaron Kimball.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/c++/task-controller/Makefile.in
hadoop/mapreduce/trunk/src/c++/task-controller/main.c
hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Dec 4 05:06:34 2009
@@ -57,6 +57,9 @@
MAPREDUCE-1190. Add package documentation for BBP example.
(Tsz Wo (Nicholas) Sze via cdouglas)
+ MAPREDUCE-1119. When tasks fail to report status, show tasks's stack dump
+ before killing. (Aaron Kimball via tomwhite)
+
OPTIMIZATIONS
MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
Modified: hadoop/mapreduce/trunk/src/c++/task-controller/Makefile.in
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/Makefile.in?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/Makefile.in (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/Makefile.in Fri Dec 4 05:06:34 2009
@@ -26,22 +26,22 @@
TESTBINARY=${testdir}/test-task-controller
all: $(OBJS)
- $(CC) $(CFLAG) -o $(BINARY) $(OBJS)
+ $(CC) $(CFLAGS) -o $(BINARY) $(OBJS)
main.o: main.c task-controller.h
- $(CC) $(CFLAG) -o main.o -c main.c
+ $(CC) $(CFLAGS) -o main.o -c main.c
task-controller.o: task-controller.c task-controller.h
- $(CC) $(CFLAG) -o task-controller.o -c task-controller.c
+ $(CC) $(CFLAGS) -o task-controller.o -c task-controller.c
configuration.o: configuration.h configuration.c
- $(CC) $(CFLAG) -o configuration.o -c configuration.c
+ $(CC) $(CFLAGS) -o configuration.o -c configuration.c
${testdir}/test-task-controller.o: task-controller.c task-controller.h
- $(CC) $(CFLAG) -o ${testdir}/test-task-controller.o -c ${testdir}/test-task-controller.c
+ $(CC) $(CFLAGS) -o ${testdir}/test-task-controller.o -c ${testdir}/test-task-controller.c
test: $(TESTOBJS)
- $(CC) $(CFLAG) -o $(TESTBINARY) $(TESTOBJS)
+ $(CC) $(CFLAGS) -o $(TESTBINARY) $(TESTOBJS)
clean:
rm -rf $(BINARY) $(OBJS) $(TESTOBJS)
Modified: hadoop/mapreduce/trunk/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/main.c?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/main.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/main.c Fri Dec 4 05:06:34 2009
@@ -58,6 +58,7 @@
NULL, 0 } };
const char* log_file = NULL;
+ int conf_dir_len = 0;
//Minimum number of arguments required to run the task-controller
//command-name user command tt-root
@@ -67,10 +68,17 @@
}
#ifndef HADOOP_CONF_DIR
- hadoop_conf_dir = (char *) malloc (sizeof(char) *
- (strlen(argv[0]) - strlen(EXEC_PATTERN)) + 1);
- strncpy(hadoop_conf_dir,argv[0],(strlen(argv[0]) - strlen(EXEC_PATTERN)));
- hadoop_conf_dir[(strlen(argv[0]) - strlen(EXEC_PATTERN))] = '\0';
+ conf_dir_len = (strlen(argv[0]) - strlen(EXEC_PATTERN)) + 1;
+ if (conf_dir_len < 1) {
+ // We didn't get an absolute path to our argv[0]; bail.
+ printf("Cannot find configuration directory.\n");
+ printf("This program must be run with its full absolute path.\n");
+ return INVALID_CONF_DIR;
+ } else {
+ hadoop_conf_dir = (char *) malloc (sizeof(char) * conf_dir_len);
+ strncpy(hadoop_conf_dir,argv[0],(strlen(argv[0]) - strlen(EXEC_PATTERN)));
+ hadoop_conf_dir[(strlen(argv[0]) - strlen(EXEC_PATTERN))] = '\0';
+ }
#endif
do {
next_option = getopt_long(argc, argv, short_options, long_options, NULL);
@@ -142,6 +150,10 @@
exit_code
= run_debug_script_as_user(user_detail->pw_name, job_id, task_id, tt_root);
break;
+ case SIGQUIT_TASK_JVM:
+ task_pid = argv[optind++];
+ exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGQUIT);
+ break;
default:
exit_code = INVALID_COMMAND_PROVIDED;
}
Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.c?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c Fri Dec 4 05:06:34 2009
@@ -35,6 +35,7 @@
}
if(initgroups(user_detail->pw_name, user_detail->pw_gid) != 0) {
+ fprintf(LOGFILE, "unable to initgroups : %s\n", strerror(errno));
cleanup();
return SETUID_OPER_FAILED;
}
@@ -1029,7 +1030,8 @@
return run_process_as_user(user, jobid, taskid, tt_root, RUN_DEBUG_SCRIPT);
}
/**
- * Function used to terminate/kill a task launched by the user.
+ * Function used to terminate/kill a task launched by the user,
+ * or dump the process' stack (by sending SIGQUIT).
* The function sends appropriate signal to the process group
* specified by the task_pid.
*/
Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.h?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h Fri Dec 4 05:06:34 2009
@@ -45,6 +45,7 @@
TERMINATE_TASK_JVM,
KILL_TASK_JVM,
RUN_DEBUG_SCRIPT,
+ SIGQUIT_TASK_JVM,
};
enum errorcodes {
@@ -69,6 +70,7 @@
INITIALIZE_DISTCACHE_FAILED, //19
INITIALIZE_USER_FAILED, //20
UNABLE_TO_EXECUTE_DEBUG_SCRIPT, //21
+ INVALID_CONF_DIR, //22
};
#define USER_DIR_PATTERN "%s/taskTracker/%s"
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DefaultTaskController.java Fri Dec 4 05:06:34 2009
@@ -119,14 +119,14 @@
if (shexec != null) {
if (Shell.WINDOWS) {
//We don't do send kill process signal in case of windows as
- //already we have done a process.destroy() in termintateTaskJVM()
+ //already we have done a process.destroy() in terminateTaskJVM()
return;
}
String pid = context.pid;
if (pid != null) {
if(ProcessTree.isSetsidAvailable) {
ProcessTree.killProcessGroup(pid);
- }else {
+ } else {
ProcessTree.killProcess(pid);
}
}
@@ -134,6 +134,26 @@
}
@Override
+ void dumpTaskStack(TaskControllerContext context) {
+ ShellCommandExecutor shexec = context.shExec;
+ if (shexec != null) {
+ if (Shell.WINDOWS) {
+ // We don't use signals in Windows.
+ return;
+ }
+ String pid = context.pid;
+ if (pid != null) {
+ // Send SIGQUIT to get a stack dump
+ if (ProcessTree.isSetsidAvailable) {
+ ProcessTree.sigQuitProcessGroup(pid);
+ } else {
+ ProcessTree.sigQuitProcess(pid);
+ }
+ }
+ }
+ }
+
+ @Override
public void initializeDistributedCache(InitializationContext context) {
// Do nothing.
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmManager.java Fri Dec 4 05:06:34 2009
@@ -35,6 +35,7 @@
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.mapreduce.util.ProcessTree;
class JvmManager {
@@ -136,6 +137,14 @@
}
}
+ void dumpStack(TaskRunner tr) {
+ if (tr.getTask().isMapTask()) {
+ mapJvmManager.dumpStack(tr);
+ } else {
+ reduceJvmManager.dumpStack(tr);
+ }
+ }
+
public void killJvm(JVMId jvmId) {
if (jvmId.isMap) {
mapJvmManager.killJvm(jvmId);
@@ -243,6 +252,16 @@
}
}
+ synchronized void dumpStack(TaskRunner tr) {
+ JVMId jvmId = runningTaskToJvm.get(tr);
+ if (null != jvmId) {
+ JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
+ if (null != jvmRunner) {
+ jvmRunner.dumpChildStacks();
+ }
+ }
+ }
+
synchronized public void stop() {
//since the kill() method invoked later on would remove
//an entry from the jvmIdToRunner map, we create a
@@ -459,7 +478,38 @@
removeJvm(jvmId);
}
}
-
+
+ /** Send a signal to the JVM requesting that it dump a stack trace,
+ * and wait for a timeout interval to give this signal time to be
+ * processed.
+ */
+ void dumpChildStacks() {
+ if (!killed) {
+ TaskController controller = tracker.getTaskController();
+ // Check inital context before issuing a signal to prevent situations
+ // where signal is issued before task is launched.
+ if (initalContext != null && initalContext.env != null) {
+ initalContext.pid = jvmIdToPid.get(jvmId);
+ initalContext.sleeptimeBeforeSigkill = tracker.getJobConf()
+ .getLong(TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL,
+ ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+
+ // signal the task jvm
+ controller.dumpTaskStack(initalContext);
+
+ // We're going to kill the jvm with SIGKILL after this,
+ // so we should wait for a few seconds first to ensure that
+ // the SIGQUIT has time to be processed.
+ try {
+ Thread.sleep(initalContext.sleeptimeBeforeSigkill);
+ } catch (InterruptedException e) {
+ LOG.warn("Sleep interrupted : " +
+ StringUtils.stringifyException(e));
+ }
+ }
+ }
+ }
+
public void taskRan() {
busy = false;
numTasksRan++;
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Fri Dec 4 05:06:34 2009
@@ -89,6 +89,7 @@
TERMINATE_TASK_JVM,
KILL_TASK_JVM,
RUN_DEBUG_SCRIPT,
+ SIGQUIT_TASK_JVM,
}
/**
@@ -443,16 +444,16 @@
}
/**
- * Convenience method used to sending appropriate Kill signal to the task
+ * Convenience method used to sending appropriate signal to the task
* VM
* @param context
* @param command
* @throws IOException
*/
- private void finishTask(TaskControllerContext context,
+ protected void signalTask(TaskControllerContext context,
TaskCommands command) throws IOException{
if(context.task == null) {
- LOG.info("Context task null not killing the JVM");
+ LOG.info("Context task is null; not signaling the JVM");
return;
}
ShellCommandExecutor shExec = buildTaskControllerExecutor(
@@ -469,7 +470,7 @@
@Override
void terminateTask(TaskControllerContext context) {
try {
- finishTask(context, TaskCommands.TERMINATE_TASK_JVM);
+ signalTask(context, TaskCommands.TERMINATE_TASK_JVM);
} catch (Exception e) {
LOG.warn("Exception thrown while sending kill to the Task VM " +
StringUtils.stringifyException(e));
@@ -479,13 +480,23 @@
@Override
void killTask(TaskControllerContext context) {
try {
- finishTask(context, TaskCommands.KILL_TASK_JVM);
+ signalTask(context, TaskCommands.KILL_TASK_JVM);
} catch (Exception e) {
LOG.warn("Exception thrown while sending destroy to the Task VM " +
StringUtils.stringifyException(e));
}
}
+ @Override
+ void dumpTaskStack(TaskControllerContext context) {
+ try {
+ signalTask(context, TaskCommands.SIGQUIT_TASK_JVM);
+ } catch (Exception e) {
+ LOG.warn("Exception thrown while sending SIGQUIT to the Task VM " +
+ StringUtils.stringifyException(e));
+ }
+ }
+
protected String getTaskControllerExecutablePath() {
return taskControllerExe;
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java Fri Dec 4 05:06:34 2009
@@ -134,26 +134,26 @@
/**
* Top level cleanup a task JVM method.
- *
- * The current implementation does the following.
* <ol>
- * <li>Sends a graceful terminate signal to task JVM allowing its sub-process
+ * <li>Sends a graceful termiante signal to task JVM to allow subprocesses
* to cleanup.</li>
- * <li>Waits for stipulated period</li>
* <li>Sends a forceful kill signal to task JVM, terminating all its
- * sub-process forcefully.</li>
+ * sub-processes forcefully.</li>
* </ol>
- *
+ *
* @param context the task for which kill signal has to be sent.
*/
final void destroyTaskJVM(TaskControllerContext context) {
+ // Send SIGTERM to try to ask for a polite exit.
terminateTask(context);
+
try {
Thread.sleep(context.sleeptimeBeforeSigkill);
} catch (InterruptedException e) {
- LOG.warn("Sleep interrupted : " +
+ LOG.warn("Sleep interrupted : " +
StringUtils.stringifyException(e));
}
+
killTask(context);
}
@@ -224,6 +224,15 @@
*/
abstract void killTask(TaskControllerContext context);
+
+ /**
+ * Sends a QUIT signal to direct the task JVM (and sub-processes) to
+ * dump their stack to stdout.
+ *
+ * @param context task context.
+ */
+ abstract void dumpTaskStack(TaskControllerContext context);
+
/**
* Initialize user on this TaskTracer in a TaskController specific manner.
*
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Fri Dec 4 05:06:34 2009
@@ -93,6 +93,8 @@
public TaskTracker.TaskInProgress getTaskInProgress() { return tip; }
public TaskTracker getTracker() { return tracker; }
+ public JvmManager getJvmManager() { return jvmManager; }
+
/** Called to assemble this task's input. This method is run in the parent
* process before the child is spawned. It should not execute user code,
* only system code. */
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Dec 4 05:06:34 2009
@@ -1509,6 +1509,7 @@
ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
tip.reportDiagnosticInfo(msg);
myInstrumentation.timedoutTask(tip.getTask().getTaskID());
+ dumpTaskStack(tip);
purgeTask(tip, true);
}
}
@@ -1516,6 +1517,22 @@
}
/**
+ * Send a signal to a stuck task commanding it to dump stack traces
+ * to stderr before we kill it with purgeTask().
+ *
+ * @param tip {@link TaskInProgress} to dump stack traces.
+ */
+ private void dumpTaskStack(TaskInProgress tip) {
+ TaskRunner runner = tip.getTaskRunner();
+ if (null == runner) {
+ return; // tip is already abandoned.
+ }
+
+ JvmManager jvmMgr = runner.getJvmManager();
+ jvmMgr.dumpStack(runner);
+ }
+
+ /**
* The task tracker is done with this job, so we need to clean up.
* @param action The action with the job
* @throws IOException
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java Fri Dec 4 05:06:34 2009
@@ -37,6 +37,15 @@
public static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 5000L;
+ private static final int SIGQUIT = 3;
+ private static final int SIGTERM = 15;
+ private static final int SIGKILL = 9;
+
+ private static final String SIGQUIT_STR = "SIGQUIT";
+ private static final String SIGTERM_STR = "SIGTERM";
+ private static final String SIGKILL_STR = "SIGKILL";
+
+
public static final boolean isSetsidAvailable = isSetsidSupported();
private static boolean isSetsidSupported() {
ShellCommandExecutor shexec = null;
@@ -102,43 +111,78 @@
sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground);
}
+
/**
- * Sends terminate signal to the process, allowing it to gracefully exit.
- *
- * @param pid pid of the process to be sent SIGTERM
+ * Send a specified signal to the specified pid
+ *
+ * @param pid the pid of the process [group] to signal.
+ * @param signalNum the signal to send.
+ * @param signalName the human-readable description of the signal
+ * (for logging).
*/
- public static void terminateProcess(String pid) {
+ private static void sendSignal(String pid, int signalNum, String signalName) {
ShellCommandExecutor shexec = null;
try {
- String[] args = { "kill", pid };
+ String[] args = { "kill", "-" + signalNum, pid };
shexec = new ShellCommandExecutor(args);
shexec.execute();
} catch (IOException ioe) {
LOG.warn("Error executing shell command " + ioe);
} finally {
- LOG.info("Killing process " + pid +
- " with SIGTERM. Exit code " + shexec.getExitCode());
+ if (pid.startsWith("-")) {
+ LOG.info("Sending signal to all members of process group " + pid
+ + ": " + signalName + ". Exit code " + shexec.getExitCode());
+ } else {
+ LOG.info("Signaling process " + pid
+ + " with " + signalName + ". Exit code " + shexec.getExitCode());
+ }
}
}
/**
+ * Send a specified signal to the process, if it is alive.
+ *
+ * @param pid the pid of the process to signal.
+ * @param signalNum the signal to send.
+ * @param signalName the human-readable description of the signal
+ * (for logging).
+ * @param alwaysSignal if true then send signal even if isAlive(pid) is false
+ */
+ private static void maybeSignalProcess(String pid, int signalNum,
+ String signalName, boolean alwaysSignal) {
+ // If process tree is not alive then don't signal, unless alwaysSignal
+ // forces it so.
+ if (alwaysSignal || ProcessTree.isAlive(pid)) {
+ sendSignal(pid, signalNum, signalName);
+ }
+ }
+
+ private static void maybeSignalProcessGroup(String pgrpId, int signalNum,
+ String signalName, boolean alwaysSignal) {
+
+ if (alwaysSignal || ProcessTree.isProcessGroupAlive(pgrpId)) {
+ // signaling a process group means using a negative pid.
+ sendSignal("-" + pgrpId, signalNum, signalName);
+ }
+ }
+
+ /**
+ * Sends terminate signal to the process, allowing it to gracefully exit.
+ *
+ * @param pid pid of the process to be sent SIGTERM
+ */
+ public static void terminateProcess(String pid) {
+ maybeSignalProcess(pid, SIGTERM, SIGTERM_STR, true);
+ }
+
+ /**
* Sends terminate signal to all the process belonging to the passed process
* group, allowing the group to gracefully exit.
*
* @param pgrpId process group id
*/
public static void terminateProcessGroup(String pgrpId) {
- ShellCommandExecutor shexec = null;
- try {
- String[] args = { "kill", "--", "-" + pgrpId };
- shexec = new ShellCommandExecutor(args);
- shexec.execute();
- } catch (IOException ioe) {
- LOG.warn("Error executing shell command " + ioe);
- } finally {
- LOG.info("Killing all processes in the process group " + pgrpId +
- " with SIGTERM. Exit code " + shexec.getExitCode());
- }
+ maybeSignalProcessGroup(pgrpId, SIGTERM, SIGTERM_STR, true);
}
/**
@@ -197,22 +241,17 @@
* @param pid process id
*/
public static void killProcess(String pid) {
+ maybeSignalProcess(pid, SIGKILL, SIGKILL_STR, false);
+ }
- //If process tree is not alive then return immediately.
- if(!ProcessTree.isAlive(pid)) {
- return;
- }
- String[] args = { "kill", "-9", pid };
- ShellCommandExecutor shexec = new ShellCommandExecutor(args);
- try {
- shexec.execute();
- } catch (IOException e) {
- LOG.warn("Error sending SIGKILL to process "+ pid + " ."+
- StringUtils.stringifyException(e));
- } finally {
- LOG.info("Killing process " + pid + " with SIGKILL. Exit code "
- + shexec.getExitCode());
- }
+ /**
+ * Sends SIGQUIT to process; Java programs will dump their stack to
+ * stdout.
+ *
+ * @param pid process id
+ */
+ public static void sigQuitProcess(String pid) {
+ maybeSignalProcess(pid, SIGQUIT, SIGQUIT_STR, false);
}
/**
@@ -222,25 +261,20 @@
* @param pgrpId process group id
*/
public static void killProcessGroup(String pgrpId) {
+ maybeSignalProcessGroup(pgrpId, SIGKILL, SIGKILL_STR, false);
+ }
- //If process tree is not alive then return immediately.
- if(!ProcessTree.isProcessGroupAlive(pgrpId)) {
- return;
- }
-
- String[] args = { "kill", "-9", "-"+pgrpId };
- ShellCommandExecutor shexec = new ShellCommandExecutor(args);
- try {
- shexec.execute();
- } catch (IOException e) {
- LOG.warn("Error sending SIGKILL to process group "+ pgrpId + " ."+
- StringUtils.stringifyException(e));
- } finally {
- LOG.info("Killing process group" + pgrpId + " with SIGKILL. Exit code "
- + shexec.getExitCode());
- }
+ /**
+ * Sends SIGQUIT to all processes belonging to the same process group,
+ * ordering all processes in the group to send their stack dump to
+ * stdout.
+ *
+ * @param pgrpId process group id
+ */
+ public static void sigQuitProcessGroup(String pgrpId) {
+ maybeSignalProcessGroup(pgrpId, SIGQUIT, SIGQUIT_STR, false);
}
-
+
/**
* Is the process with PID pid still alive?
* This method assumes that isAlive is called on a pid that was alive not
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Fri Dec 4 05:06:34 2009
@@ -33,6 +33,7 @@
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
import junit.framework.TestCase;
@@ -48,7 +49,10 @@
* <li>Make the built binary to setuid executable</li>
* <li>Execute following targets:
* <code>ant test -Dcompile.c++=true -Dtaskcontroller-path=<em>path to built binary</em>
- * -Dtaskcontroller-ugi=<em>user,group</em></code></li>
+ * -Dtaskcontroller-ugi=<em>user,group</em></code>
+ * <br/>(Note that "path to built binary" means the directory containing task-controller -
+ * not the actual complete path of the binary itself. This path must end in ".../bin")
+ * </li>
* </ol>
*
*/
@@ -72,6 +76,24 @@
void setTaskControllerExe(String execPath) {
this.taskControllerExePath = execPath;
}
+
+ volatile static int attemptedSigQuits = 0;
+ volatile static int failedSigQuits = 0;
+
+ /** Work like LinuxTaskController, but also count the number of
+ * attempted and failed SIGQUIT sends via the task-controller
+ * executable.
+ */
+ @Override
+ void dumpTaskStack(TaskControllerContext context) {
+ attemptedSigQuits++;
+ try {
+ signalTask(context, TaskCommands.SIGQUIT_TASK_JVM);
+ } catch (Exception e) {
+ LOG.warn("Execution sending SIGQUIT: " + StringUtils.stringifyException(e));
+ failedSigQuits++;
+ }
+ }
}
// cluster instances which sub classes can use
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java Fri Dec 4 05:06:34 2009
@@ -22,6 +22,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.util.ToolRunner;
@@ -92,4 +93,30 @@
}
}
}
+
+ /** Ensure that SIGQUIT can be properly sent by the LinuxTaskController
+ * if a task times out.
+ */
+ public void testTimeoutStackTrace() throws Exception {
+ if (!shouldRun()) {
+ return;
+ }
+
+ // Run a job that should timeout and trigger a SIGQUIT.
+ startCluster();
+ JobConf conf = getClusterConf();
+ conf.setInt(JobContext.TASK_TIMEOUT, 10000);
+ conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
+ SleepJob sleepJob = new SleepJob();
+ sleepJob.setConf(conf);
+ Job job = sleepJob.createJob(1, 0, 30000, 1, 0, 0);
+ job.setMaxMapAttempts(1);
+ int prevNumSigQuits = MyLinuxTaskController.attemptedSigQuits;
+ job.waitForCompletion(true);
+ assertTrue("Did not detect a new SIGQUIT!",
+ prevNumSigQuits < MyLinuxTaskController.attemptedSigQuits);
+ assertEquals("A SIGQUIT attempt failed!", 0,
+ MyLinuxTaskController.failedSigQuits);
+
+ }
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java Fri Dec 4 05:06:34 2009
@@ -18,12 +18,20 @@
package org.apache.hadoop.mapred;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
import java.io.File;
+import java.io.InputStreamReader;
import java.io.IOException;
import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.SleepJob;
/**
* A JUnit test to test Kill Job & Fail Job functionality with local file
@@ -31,38 +39,96 @@
*/
public class TestJobKillAndFail extends TestCase {
+ static final Log LOG = LogFactory.getLog(TestJobKillAndFail.class);
+
private static String TEST_ROOT_DIR = new File(System.getProperty(
"test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
- public void testJobFailAndKill() throws IOException {
+ /**
+ * TaskController instance that just sets a flag when a stack dump
+ * is performed in a child thread.
+ */
+ static class MockStackDumpTaskController extends DefaultTaskController {
+
+ static volatile int numStackDumps = 0;
+
+ static final Log LOG = LogFactory.getLog(TestJobKillAndFail.class);
+
+ public MockStackDumpTaskController() {
+ LOG.info("Instantiated MockStackDumpTC");
+ }
+
+ @Override
+ void dumpTaskStack(TaskControllerContext context) {
+ LOG.info("Got stack-dump request in TaskController");
+ MockStackDumpTaskController.numStackDumps++;
+ super.dumpTaskStack(context);
+ }
+
+ }
+
+ /** If a task was killed, then dumpTaskStack() should have been
+ * called. Test whether or not the counter was incremented
+ * and succeed/fail based on this. */
+ private void checkForStackDump(boolean expectDump, int lastNumDumps) {
+ int curNumDumps = MockStackDumpTaskController.numStackDumps;
+
+ LOG.info("curNumDumps=" + curNumDumps + "; lastNumDumps=" + lastNumDumps
+ + "; expect=" + expectDump);
+
+ if (expectDump) {
+ assertTrue("No stack dump recorded!", lastNumDumps < curNumDumps);
+ } else {
+ assertTrue("Stack dump happened anyway!", lastNumDumps == curNumDumps);
+ }
+ }
+
+ public void testJobFailAndKill() throws Exception {
MiniMRCluster mr = null;
try {
JobConf jtConf = new JobConf();
jtConf.set("mapred.jobtracker.instrumentation",
JTInstrumentation.class.getName());
+ jtConf.set("mapreduce.tasktracker.taskcontroller",
+ MockStackDumpTaskController.class.getName());
mr = new MiniMRCluster(2, "file:///", 3, null, null, jtConf);
JTInstrumentation instr = (JTInstrumentation)
mr.getJobTrackerRunner().getJobTracker().getInstrumentation();
// run the TCs
JobConf conf = mr.createJobConf();
+ conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input");
Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output");
- RunningJob job = UtilsForTests.runJobFail(conf, inDir, outDir);
+ RunningJob runningJob = UtilsForTests.runJobFail(conf, inDir, outDir);
// Checking that the Job got failed
- assertEquals(job.getJobState(), JobStatus.FAILED);
+ assertEquals(runningJob.getJobState(), JobStatus.FAILED);
assertTrue(instr.verifyJob());
assertEquals(1, instr.failed);
instr.reset();
-
- job = UtilsForTests.runJobKill(conf, inDir, outDir);
+ int prevNumDumps = MockStackDumpTaskController.numStackDumps;
+ runningJob = UtilsForTests.runJobKill(conf, inDir, outDir);
// Checking that the Job got killed
- assertTrue(job.isComplete());
- assertEquals(job.getJobState(), JobStatus.KILLED);
+ assertTrue(runningJob.isComplete());
+ assertEquals(runningJob.getJobState(), JobStatus.KILLED);
assertTrue(instr.verifyJob());
assertEquals(1, instr.killed);
+ // check that job kill does not put a stacktrace in task logs.
+ checkForStackDump(false, prevNumDumps);
+
+ // Test that a task that times out does have a stack trace
+ conf = mr.createJobConf();
+ conf.setInt(JobContext.TASK_TIMEOUT, 10000);
+ conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
+ SleepJob sleepJob = new SleepJob();
+ sleepJob.setConf(conf);
+ Job job = sleepJob.createJob(1, 0, 30000, 1,0, 0);
+ job.setMaxMapAttempts(1);
+ prevNumDumps = MockStackDumpTaskController.numStackDumps;
+ job.waitForCompletion(true);
+ checkForStackDump(true, prevNumDumps);
} finally {
if (mr != null) {
mr.shutdown();
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=887061&r1=887060&r2=887061&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Fri Dec 4 05:06:34 2009
@@ -607,6 +607,7 @@
conf.setJobName("test-job-fail");
conf.setMapperClass(FailMapper.class);
conf.setReducerClass(IdentityReducer.class);
+ conf.setMaxMapAttempts(1);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
while (!job.isComplete()) {