You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2009/11/17 22:27:35 UTC
svn commit: r881538 - in /hadoop/mapreduce/branches/branch-0.21: ./
src/c++/task-controller/ src/java/org/apache/hadoop/mapred/
src/test/mapred/org/apache/hadoop/mapred/
Author: ddas
Date: Tue Nov 17 21:27:34 2009
New Revision: 881538
URL: http://svn.apache.org/viewvc?rev=881538&view=rev
Log:
Merge -r 881535:881536 from trunk onto 0.21 branch. Fixes MAPREDUCE-915.
Added:
hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java
- copied unchanged from r881536, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java
hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestDebugScriptWithLinuxTaskController.java
- copied unchanged from r881536, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestDebugScriptWithLinuxTaskController.java
Modified:
hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/main.c
hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.c
hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.h
hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskController.java
hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java
hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java
hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Tue Nov 17 21:27:34 2009
@@ -821,3 +821,5 @@
MAPREDUCE-1147. Add map output counters to new API. (Amar Kamat via
cdouglas)
+
+ MAPREDUCE-915. The debug scripts are run as the job user. (ddas)
Modified: hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/c%2B%2B/task-controller/main.c?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/main.c (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/main.c Tue Nov 17 21:27:34 2009
@@ -135,6 +135,13 @@
task_pid = argv[optind++];
exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGKILL);
break;
+ case RUN_DEBUG_SCRIPT:
+ tt_root = argv[optind++];
+ job_id = argv[optind++];
+ task_id = argv[optind++];
+ exit_code
+ = run_debug_script_as_user(user_detail->pw_name, job_id, task_id, tt_root);
+ break;
default:
exit_code = INVALID_COMMAND_PROVIDED;
}
Modified: hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/c%2B%2B/task-controller/task-controller.c?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.c (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.c Tue Nov 17 21:27:34 2009
@@ -910,25 +910,41 @@
}
/*
- * Function used to launch a task as the provided user. It does the following :
+ * Function used to launch a task as the provided user.
+ */
+int run_task_as_user(const char * user, const char *jobid, const char *taskid,
+ const char *tt_root) {
+ return run_process_as_user(user, jobid, taskid, tt_root, LAUNCH_TASK_JVM);
+}
+
+/*
+ * Function that is used as a helper to launch task JVMs and debug scripts.
+ * Not meant for launching any other process. It does the following :
* 1) Checks if the tt_root passed is found in mapreduce.cluster.local.dir
- * 2) Prepares attempt_dir and log_dir to be accessible by the child
+ * 2) Prepares attempt_dir and log_dir to be accessible by the task JVMs
* 3) Uses get_task_launcher_file to fetch the task script file path
* 4) Does an execlp on the same in order to replace the current image with
* task image.
*/
-int run_task_as_user(const char * user, const char *jobid, const char *taskid,
- const char *tt_root) {
- int exit_code = 0;
-
+int run_process_as_user(const char * user, const char * jobid,
+const char *taskid, const char *tt_root, int command) {
+ if (command != LAUNCH_TASK_JVM && command != RUN_DEBUG_SCRIPT) {
+ return INVALID_COMMAND_PROVIDED;
+ }
if (jobid == NULL || taskid == NULL || tt_root == NULL) {
return INVALID_ARGUMENT_NUMBER;
}
+
+ if (command == LAUNCH_TASK_JVM) {
+ fprintf(LOGFILE, "run_process_as_user launching a JVM for task :%s.\n", taskid);
+ } else if (command == RUN_DEBUG_SCRIPT) {
+ fprintf(LOGFILE, "run_process_as_user launching a debug script for task :%s.\n", taskid);
+ }
#ifdef DEBUG
- fprintf(LOGFILE, "Job-id passed to run_task_as_user : %s.\n", jobid);
- fprintf(LOGFILE, "task-d passed to run_task_as_user : %s.\n", taskid);
- fprintf(LOGFILE, "tt_root passed to run_task_as_user : %s.\n", tt_root);
+ fprintf(LOGFILE, "Job-id passed to run_process_as_user : %s.\n", jobid);
+ fprintf(LOGFILE, "task-d passed to run_process_as_user : %s.\n", taskid);
+ fprintf(LOGFILE, "tt_root passed to run_process_as_user : %s.\n", tt_root);
#endif
//Check tt_root before switching the user, as reading configuration
@@ -939,9 +955,11 @@
return INVALID_TT_ROOT;
}
+ int exit_code = 0;
char *job_dir = NULL, *task_script_path = NULL;
- if ((exit_code = initialize_task(jobid, taskid, user)) != 0) {
+ if (command == LAUNCH_TASK_JVM &&
+ (exit_code = initialize_task(jobid, taskid, user)) != 0) {
fprintf(LOGFILE, "Couldn't initialise the task %s of user %s.\n", taskid,
user);
goto cleanup;
@@ -980,9 +998,14 @@
cleanup();
execlp(task_script_path, task_script_path, NULL);
if (errno != 0) {
- fprintf(LOGFILE, "Couldn't execute the task jvm file: %s", strerror(errno));
free(task_script_path);
- exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT;
+ if (command == LAUNCH_TASK_JVM) {
+ fprintf(LOGFILE, "Couldn't execute the task jvm file: %s", strerror(errno));
+ exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT;
+ } else if (command == RUN_DEBUG_SCRIPT) {
+ fprintf(LOGFILE, "Couldn't execute the task debug script file: %s", strerror(errno));
+ exit_code = UNABLE_TO_EXECUTE_DEBUG_SCRIPT;
+ }
}
return exit_code;
@@ -998,7 +1021,13 @@
cleanup();
return exit_code;
}
-
+/*
+ * Function used to launch a debug script as the provided user.
+ */
+int run_debug_script_as_user(const char * user, const char *jobid, const char *taskid,
+ const char *tt_root) {
+ return run_process_as_user(user, jobid, taskid, tt_root, RUN_DEBUG_SCRIPT);
+}
/**
* Function used to terminate/kill a task launched by the user.
* The function sends appropriate signal to the process group
Modified: hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/c%2B%2B/task-controller/task-controller.h?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.h (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.h Tue Nov 17 21:27:34 2009
@@ -44,6 +44,7 @@
INITIALIZE_TASK,
TERMINATE_TASK_JVM,
KILL_TASK_JVM,
+ RUN_DEBUG_SCRIPT,
};
enum errorcodes {
@@ -67,6 +68,7 @@
OUT_OF_MEMORY, //18
INITIALIZE_DISTCACHE_FAILED, //19
INITIALIZE_USER_FAILED, //20
+ UNABLE_TO_EXECUTE_DEBUG_SCRIPT, //21
};
#define USER_DIR_PATTERN "%s/taskTracker/%s"
@@ -99,6 +101,9 @@
int run_task_as_user(const char * user, const char *jobid, const char *taskid,
const char *tt_root);
+int run_debug_script_as_user(const char * user, const char *jobid, const char *taskid,
+ const char *tt_root);
+
int initialize_user(const char *user);
int initialize_task(const char *jobid, const char *taskid, const char *user);
Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/DefaultTaskController.java?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/DefaultTaskController.java Tue Nov 17 21:27:34 2009
@@ -28,6 +28,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
/**
* The default implementation for controlling tasks.
@@ -38,8 +39,8 @@
*
* <br/>
*
- * NOTE: This class is internal only class and not intended for users!!
*/
+@InterfaceAudience.Private
public class DefaultTaskController extends TaskController {
private static final Log LOG =
@@ -142,4 +143,18 @@
// Do nothing.
}
+ @Override
+ void runDebugScript(DebugScriptContext context) throws IOException {
+ List<String> wrappedCommand = TaskLog.captureDebugOut(context.args,
+ context.stdout);
+ // run the script.
+ ShellCommandExecutor shexec =
+ new ShellCommandExecutor(wrappedCommand.toArray(new String[0]), context.workDir);
+ shexec.execute();
+ int exitCode = shexec.getExitCode();
+ if (exitCode != 0) {
+ throw new IOException("Task debug script exit with nonzero status of "
+ + exitCode + ".");
+ }
+ }
}
Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Tue Nov 17 21:27:34 2009
@@ -29,6 +29,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -87,6 +88,7 @@
INITIALIZE_TASK,
TERMINATE_TASK_JVM,
KILL_TASK_JVM,
+ RUN_DEBUG_SCRIPT,
}
/**
@@ -119,10 +121,12 @@
sb.append(cmdLine);
// write the command to a file in the
// task specific cache directory
- writeCommand(sb.toString(), getTaskCacheDirectory(context));
+ writeCommand(sb.toString(), getTaskCacheDirectory(context,
+ context.env.workDir));
// Call the taskcontroller with the right parameters.
- List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context);
+ List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context,
+ context.env.workDir);
ShellCommandExecutor shExec = buildTaskControllerExecutor(
TaskCommands.LAUNCH_TASK_JVM,
env.conf.getUser(),
@@ -149,7 +153,23 @@
logOutput(shExec.getOutput());
}
}
-
+
+ /**
+ * Launch the debug script process that will run as the owner of the job.
+ *
+ * This method launches the task debug script process by executing a setuid
+ * executable that will switch to the user and run the task.
+ */
+ @Override
+ void runDebugScript(DebugScriptContext context) throws IOException {
+ String debugOut = FileUtil.makeShellPath(context.stdout);
+ String cmdLine = TaskLog.buildDebugScriptCommandLine(context.args, debugOut);
+ writeCommand(cmdLine, getTaskCacheDirectory(context, context.workDir));
+ // Call the taskcontroller with the right parameters.
+ List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context, context.workDir);
+ runCommand(TaskCommands.RUN_DEBUG_SCRIPT, context.task.getUser(),
+ launchTaskJVMArgs, context.workDir, null);
+ }
/**
* Helper method that runs a LinuxTaskController command
*
@@ -192,7 +212,7 @@
* @param context
* @return Argument to be used while launching Task VM
*/
- private List<String> buildInitializeTaskArgs(TaskControllerContext context) {
+ private List<String> buildInitializeTaskArgs(TaskExecContext context) {
List<String> commandArgs = new ArrayList<String>(3);
String taskId = context.task.getTaskID().toString();
String jobId = getJobId(context);
@@ -223,7 +243,7 @@
}
}
- private String getJobId(TaskControllerContext context) {
+ private String getJobId(TaskExecContext context) {
String taskId = context.task.getTaskID().toString();
TaskAttemptID tId = TaskAttemptID.forName(taskId);
String jobId = tId.getJobID().toString();
@@ -237,15 +257,16 @@
* @param context
* @return Argument to be used while launching Task VM
*/
- private List<String> buildLaunchTaskArgs(TaskControllerContext context) {
+ private List<String> buildLaunchTaskArgs(TaskExecContext context,
+ File workDir) {
List<String> commandArgs = new ArrayList<String>(3);
LOG.debug("getting the task directory as: "
- + getTaskCacheDirectory(context));
+ + getTaskCacheDirectory(context, workDir));
LOG.debug("getting the tt_root as " +getDirectoryChosenForTask(
- new File(getTaskCacheDirectory(context)),
+ new File(getTaskCacheDirectory(context, workDir)),
context) );
commandArgs.add(getDirectoryChosenForTask(
- new File(getTaskCacheDirectory(context)),
+ new File(getTaskCacheDirectory(context, workDir)),
context));
commandArgs.addAll(buildInitializeTaskArgs(context));
return commandArgs;
@@ -255,7 +276,7 @@
// in Configs.LOCAL_DIR chosen for storing data pertaining to
// this task.
private String getDirectoryChosenForTask(File directory,
- TaskControllerContext context) {
+ TaskExecContext context) {
String jobId = getJobId(context);
String taskId = context.task.getTaskID().toString();
for (String dir : mapredLocalDirs) {
@@ -322,12 +343,13 @@
}
// Return the task specific directory under the cache.
- private String getTaskCacheDirectory(TaskControllerContext context) {
+ private String getTaskCacheDirectory(TaskExecContext context,
+ File workDir) {
// In the case of JVM reuse, the task specific directory
// is different from what is set with respect with
// env.workDir. Hence building this from the taskId everytime.
String taskId = context.task.getTaskID().toString();
- File cacheDirForJob = context.env.workDir.getParentFile().getParentFile();
+ File cacheDirForJob = workDir.getParentFile().getParentFile();
if(context.task.isTaskCleanupTask()) {
taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX;
}
@@ -345,6 +367,9 @@
PrintWriter pw = null;
String commandFile = directory + File.separator + COMMAND_FILE;
LOG.info("Writing commands to " + commandFile);
+ LOG.info("--------Commands Begin--------");
+ LOG.info(cmdLine);
+ LOG.info("--------Commands End--------");
try {
FileWriter fw = new FileWriter(commandFile);
BufferedWriter bw = new BufferedWriter(fw);
Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskController.java?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskController.java Tue Nov 17 21:27:34 2009
@@ -19,6 +19,7 @@
import java.io.File;
import java.io.IOException;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -29,6 +30,7 @@
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.classification.InterfaceAudience;
/**
* Controls initialization, finalization and clean up of tasks, and
@@ -40,9 +42,8 @@
* performing the actual actions.
*
* <br/>
- *
- * NOTE: This class is internal only class and not intended for users!!
*/
+@InterfaceAudience.Private
public abstract class TaskController implements Configurable {
private Configuration conf;
@@ -171,12 +172,14 @@
abstract void initializeTask(TaskControllerContext context)
throws IOException;
+ static class TaskExecContext {
+ // task being executed
+ Task task;
+ }
/**
* Contains task information required for the task controller.
*/
- static class TaskControllerContext {
- // task being executed
- Task task;
+ static class TaskControllerContext extends TaskExecContext {
ShellCommandExecutor shExec; // the Shell executor executing the JVM for this task.
// Information used only when this context is used for launching new tasks.
@@ -199,6 +202,12 @@
static class JobInitializationContext extends InitializationContext {
JobID jobid;
}
+
+ static class DebugScriptContext extends TaskExecContext {
+ List<String> args;
+ File workDir;
+ File stdout;
+ }
/**
* Sends a graceful terminate signal to taskJVM and it sub-processes.
@@ -223,4 +232,14 @@
*/
public abstract void initializeUser(InitializationContext context)
throws IOException;
+
+ /**
+ * Launch the task debug script
+ *
+ * @param context
+ * @throws IOException
+ */
+ abstract void runDebugScript(DebugScriptContext context)
+ throws IOException;
+
}
Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java Tue Nov 17 21:27:34 2009
@@ -561,6 +561,37 @@
}
/**
+ * Construct the command line for running the debug script
+ * @param cmd The command and the arguments that should be run
+ * @param stdoutFilename The filename that stdout should be saved to
+ * @param stderrFilename The filename that stderr should be saved to
+ * @param tailLength The length of the tail to be saved.
+ * @return the command line as a String
+ * @throws IOException
+ */
+ static String buildDebugScriptCommandLine(List<String> cmd, String debugout)
+ throws IOException {
+ StringBuilder mergedCmd = new StringBuilder();
+ mergedCmd.append("exec ");
+ boolean isExecutable = true;
+ for(String s: cmd) {
+ if (isExecutable) {
+ // the executable name needs to be expressed as a shell path for the
+ // shell to find it.
+ mergedCmd.append(FileUtil.makeShellPath(new File(s)));
+ isExecutable = false;
+ } else {
+ mergedCmd.append(s);
+ }
+ mergedCmd.append(" ");
+ }
+ mergedCmd.append(" < /dev/null ");
+ mergedCmd.append(" >");
+ mergedCmd.append(debugout);
+ mergedCmd.append(" 2>&1 ");
+ return mergedCmd.toString();
+ }
+ /**
* Add quotes to each of the command strings and
* return as a single string
* @param cmd The command to be quoted
@@ -604,25 +635,7 @@
List<String> result = new ArrayList<String>(3);
result.add(bashCommand);
result.add("-c");
- StringBuffer mergedCmd = new StringBuffer();
- mergedCmd.append("exec ");
- boolean isExecutable = true;
- for(String s: cmd) {
- if (isExecutable) {
- // the executable name needs to be expressed as a shell path for the
- // shell to find it.
- mergedCmd.append(FileUtil.makeShellPath(new File(s)));
- isExecutable = false;
- } else {
- mergedCmd.append(s);
- }
- mergedCmd.append(" ");
- }
- mergedCmd.append(" < /dev/null ");
- mergedCmd.append(" >");
- mergedCmd.append(debugout);
- mergedCmd.append(" 2>&1 ");
- result.add(mergedCmd.toString());
+ result.add(buildDebugScriptCommandLine(cmd, debugout));
return result;
}
Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Nov 17 21:27:34 2009
@@ -65,6 +65,7 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.TaskController.DebugScriptContext;
import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.mapred.pipes.Submitter;
@@ -2255,84 +2256,7 @@
setTaskFailState(true);
// call the script here for the failed tasks.
if (debugCommand != null) {
- String taskStdout ="";
- String taskStderr ="";
- String taskSyslog ="";
- String jobConf = task.getJobFile();
- try {
- // get task's stdout file
- taskStdout = FileUtil.makeShellPath(
- TaskLog.getRealTaskLogFileLocation
- (task.getTaskID(), TaskLog.LogName.STDOUT));
- // get task's stderr file
- taskStderr = FileUtil.makeShellPath(
- TaskLog.getRealTaskLogFileLocation
- (task.getTaskID(), TaskLog.LogName.STDERR));
- // get task's syslog file
- taskSyslog = FileUtil.makeShellPath(
- TaskLog.getRealTaskLogFileLocation
- (task.getTaskID(), TaskLog.LogName.SYSLOG));
- } catch(IOException e){
- LOG.warn("Exception finding task's stdout/err/syslog files");
- }
- File workDir = null;
- try {
- workDir =
- new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getLocalTaskDir(task.getUser(), task
- .getJobID().toString(), task.getTaskID()
- .toString(), task.isTaskCleanupTask())
- + Path.SEPARATOR + MRConstants.WORKDIR,
- localJobConf).toString());
- } catch (IOException e) {
- LOG.warn("Working Directory of the task " + task.getTaskID() +
- " doesnt exist. Caught exception " +
- StringUtils.stringifyException(e));
- }
- // Build the command
- File stdout = TaskLog.getRealTaskLogFileLocation(
- task.getTaskID(), TaskLog.LogName.DEBUGOUT);
- // add pipes program as argument if it exists.
- String program ="";
- String executable = Submitter.getExecutable(localJobConf);
- if ( executable != null) {
- try {
- program = new URI(executable).getFragment();
- } catch (URISyntaxException ur) {
- LOG.warn("Problem in the URI fragment for pipes executable");
- }
- }
- String [] debug = debugCommand.split(" ");
- Vector<String> vargs = new Vector<String>();
- for (String component : debug) {
- vargs.add(component);
- }
- vargs.add(taskStdout);
- vargs.add(taskStderr);
- vargs.add(taskSyslog);
- vargs.add(jobConf);
- vargs.add(program);
- try {
- List<String> wrappedCommand = TaskLog.captureDebugOut
- (vargs, stdout);
- // run the script.
- try {
- runScript(wrappedCommand, workDir);
- } catch (IOException ioe) {
- LOG.warn("runScript failed with: " + StringUtils.
- stringifyException(ioe));
- }
- } catch(IOException e) {
- LOG.warn("Error in preparing wrapped debug command");
- }
-
- // add all lines of debug out to diagnostics
- try {
- int num = localJobConf.getInt(JobContext.TASK_DEBUGOUT_LINES, -1);
- addDiagnostics(FileUtil.makeShellPath(stdout),num,"DEBUG OUT");
- } catch(IOException ioe) {
- LOG.warn("Exception in add diagnostics!");
- }
+ runDebugScript();
}
}
taskStatus.setProgress(0.0f);
@@ -2360,21 +2284,84 @@
}
-
- /**
- * Runs the script given in args
- * @param args script name followed by its argumnets
- * @param dir current working directory.
- * @throws IOException
- */
- public void runScript(List<String> args, File dir) throws IOException {
- ShellCommandExecutor shexec =
- new ShellCommandExecutor(args.toArray(new String[0]), dir);
- shexec.execute();
- int exitCode = shexec.getExitCode();
- if (exitCode != 0) {
- throw new IOException("Task debug script exit with nonzero status of "
- + exitCode + ".");
+ private void runDebugScript() {
+ String taskStdout ="";
+ String taskStderr ="";
+ String taskSyslog ="";
+ String jobConf = task.getJobFile();
+ try {
+ // get task's stdout file
+ taskStdout = FileUtil.makeShellPath(
+ TaskLog.getRealTaskLogFileLocation
+ (task.getTaskID(), TaskLog.LogName.STDOUT));
+ // get task's stderr file
+ taskStderr = FileUtil.makeShellPath(
+ TaskLog.getRealTaskLogFileLocation
+ (task.getTaskID(), TaskLog.LogName.STDERR));
+ // get task's syslog file
+ taskSyslog = FileUtil.makeShellPath(
+ TaskLog.getRealTaskLogFileLocation
+ (task.getTaskID(), TaskLog.LogName.SYSLOG));
+ } catch(IOException e){
+ LOG.warn("Exception finding task's stdout/err/syslog files");
+ }
+ File workDir = null;
+ try {
+ workDir =
+ new File(lDirAlloc.getLocalPathToRead(
+ TaskTracker.getLocalTaskDir(task.getUser(), task
+ .getJobID().toString(), task.getTaskID()
+ .toString(), task.isTaskCleanupTask())
+ + Path.SEPARATOR + MRConstants.WORKDIR,
+ localJobConf).toString());
+ } catch (IOException e) {
+ LOG.warn("Working Directory of the task " + task.getTaskID() +
+ " doesnt exist. Caught exception " +
+ StringUtils.stringifyException(e));
+ }
+ // Build the command
+ File stdout = TaskLog.getRealTaskLogFileLocation(
+ task.getTaskID(), TaskLog.LogName.DEBUGOUT);
+ // add pipes program as argument if it exists.
+ String program ="";
+ String executable = Submitter.getExecutable(localJobConf);
+ if ( executable != null) {
+ try {
+ program = new URI(executable).getFragment();
+ } catch (URISyntaxException ur) {
+ LOG.warn("Problem in the URI fragment for pipes executable");
+ }
+ }
+ String [] debug = debugCommand.split(" ");
+ List<String> vargs = new ArrayList<String>();
+ for (String component : debug) {
+ vargs.add(component);
+ }
+ vargs.add(taskStdout);
+ vargs.add(taskStderr);
+ vargs.add(taskSyslog);
+ vargs.add(jobConf);
+ vargs.add(program);
+ DebugScriptContext context =
+ new TaskController.DebugScriptContext();
+ context.args = vargs;
+ context.stdout = stdout;
+ context.workDir = workDir;
+ context.task = task;
+ try {
+ getTaskController().runDebugScript(context);
+ // add all lines of debug out to diagnostics
+ try {
+ int num = localJobConf.getInt(JobContext.TASK_DEBUGOUT_LINES,
+ -1);
+ addDiagnostics(FileUtil.makeShellPath(stdout),num,
+ "DEBUG OUT");
+ } catch(IOException ioe) {
+ LOG.warn("Exception in add diagnostics!");
+ }
+ } catch (IOException ie) {
+ LOG.warn("runDebugScript failed with: " + StringUtils.
+ stringifyException(ie));
}
}
Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java Tue Nov 17 21:27:34 2009
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.hadoop.mapreduce.TaskType;
-
-/**
- * Class to test mapred debug Script
- */
-public class TestMiniMRMapRedDebugScript extends TestCase {
- private static final Log LOG =
- LogFactory.getLog(TestMiniMRMapRedDebugScript.class.getName());
-
- private MiniMRCluster mr;
- private MiniDFSCluster dfs;
- private FileSystem fileSys;
-
- /**
- * Fail map class
- */
- public static class MapClass extends MapReduceBase
- implements Mapper<LongWritable, Text, Text, IntWritable> {
- public void map (LongWritable key, Text value,
- OutputCollector<Text, IntWritable> output,
- Reporter reporter) throws IOException {
- System.err.println("Bailing out");
- throw new IOException();
- }
- }
-
- /**
- * Reads tasklog and returns it as string after trimming it.
- * @param filter Task log filter; can be STDOUT, STDERR,
- * SYSLOG, DEBUGOUT, DEBUGERR
- * @param taskId The task id for which the log has to collected
- * @param isCleanup whether the task is a cleanup attempt or not.
- * @return task log as string
- * @throws IOException
- */
- public static String readTaskLog(TaskLog.LogName filter,
- TaskAttemptID taskId,
- boolean isCleanup)
- throws IOException {
- // string buffer to store task log
- StringBuffer result = new StringBuffer();
- int res;
-
- // reads the whole tasklog into inputstream
- InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1, isCleanup);
- // construct string log from inputstream.
- byte[] b = new byte[65536];
- while (true) {
- res = taskLogReader.read(b);
- if (res > 0) {
- result.append(new String(b));
- } else {
- break;
- }
- }
- taskLogReader.close();
-
- // trim the string and return it
- String str = result.toString();
- str = str.trim();
- return str;
- }
-
- /**
- * Launches failed map task and debugs the failed task
- * @param conf configuration for the mapred job
- * @param inDir input path
- * @param outDir output path
- * @param debugDir debug directory where script is present
- * @param debugCommand The command to execute script
- * @param input Input text
- * @return the output of debug script
- * @throws IOException
- */
- public String launchFailMapAndDebug(JobConf conf,
- Path inDir,
- Path outDir,
- Path debugDir,
- String debugScript,
- String input)
- throws IOException {
-
- // set up the input file system and write input text.
- FileSystem inFs = inDir.getFileSystem(conf);
- FileSystem outFs = outDir.getFileSystem(conf);
- outFs.delete(outDir, true);
- if (!inFs.mkdirs(inDir)) {
- throw new IOException("Mkdirs failed to create " + inDir.toString());
- }
- {
- // write input into input file
- DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
- file.writeBytes(input);
- file.close();
- }
-
- // configure the mapred Job for failing map task.
- conf.setJobName("failmap");
- conf.setMapperClass(MapClass.class);
- conf.setReducerClass(IdentityReducer.class);
- conf.setNumMapTasks(1);
- conf.setNumReduceTasks(0);
- conf.setMapDebugScript(debugScript);
- FileInputFormat.setInputPaths(conf, inDir);
- FileOutputFormat.setOutputPath(conf, outDir);
- String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
- "/tmp")).toString().replace(' ', '+');
- conf.set("test.build.data", TEST_ROOT_DIR);
-
- // copy debug script to cache from local file system.
- FileSystem debugFs = debugDir.getFileSystem(conf);
- Path scriptPath = new Path(debugDir,"testscript.txt");
- Path cachePath = new Path("/cacheDir");
- if (!debugFs.mkdirs(cachePath)) {
- throw new IOException("Mkdirs failed to create " + cachePath.toString());
- }
- debugFs.copyFromLocalFile(scriptPath,cachePath);
-
- URI uri = debugFs.getUri().resolve(cachePath+"/testscript.txt#testscript");
- DistributedCache.createSymlink(conf);
- DistributedCache.addCacheFile(uri, conf);
-
- RunningJob job =null;
- // run the job. It will fail with IOException.
- try {
- job = new JobClient(conf).submitJob(conf);
- } catch (IOException e) {
- LOG.info("Running Job failed", e);
- }
-
- JobID jobId = job.getID();
- // construct the task id of first map task of failmap
- TaskAttemptID taskId = new TaskAttemptID(
- new TaskID(jobId,TaskType.MAP, 0), 0);
- // wait for the job to finish.
- while (!job.isComplete()) ;
-
- // return the output of debugout log.
- return readTaskLog(TaskLog.LogName.DEBUGOUT,taskId, false);
- }
-
- /**
- * Tests Map task's debug script
- *
- * In this test, we launch a mapreduce program which
- * writes 'Bailing out' to stderr and throws an exception.
- * We will run the script when tsk fails and validate
- * the output of debug out log.
- *
- */
- public void testMapDebugScript() throws Exception {
- try {
-
- // create configuration, dfs, file system and mapred cluster
- Configuration cnf = new Configuration();
- dfs = new MiniDFSCluster(cnf, 1, true, null);
- fileSys = dfs.getFileSystem();
- mr = new MiniMRCluster(2, fileSys.getUri().toString(), 1);
- JobConf conf = mr.createJobConf();
-
- // intialize input, output and debug directories
- final Path debugDir = new Path("build/test/debug");
- Path inDir = new Path("testing/wc/input");
- Path outDir = new Path("testing/wc/output");
-
- // initialize debug command and input text
- String debugScript = "./testscript";
- String input = "The input";
-
- // Launch failed map task and run debug script
- String result = launchFailMapAndDebug(conf,inDir,
- outDir,debugDir, debugScript, input);
-
- // Assert the output of debug script.
- assertTrue(result.contains("Test Script\nBailing out"));
-
- } finally {
- // close file system and shut down dfs and mapred cluster
- try {
- if (fileSys != null) {
- fileSys.close();
- }
- if (dfs != null) {
- dfs.shutdown();
- }
- if (mr != null) {
- mr.shutdown();
- }
- } catch (IOException ioe) {
- LOG.info("IO exception in closing file system:"+ioe.getMessage(), ioe);
- }
- }
- }
-
- public static void main(String args[]){
- TestMiniMRMapRedDebugScript tmds = new TestMiniMRMapRedDebugScript();
- try {
- tmds.testMapDebugScript();
- } catch (Exception e) {
- LOG.error("Exception in test: "+e.getMessage(), e);
- }
- }
-
-}
-
Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java Tue Nov 17 21:27:34 2009
@@ -19,6 +19,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import junit.framework.TestCase;
@@ -118,7 +119,7 @@
assertTrue(ts != null);
assertEquals(TaskStatus.State.FAILED, ts.getRunState());
// validate tasklogs for task attempt
- String log = TestMiniMRMapRedDebugScript.readTaskLog(
+ String log = readTaskLog(
TaskLog.LogName.STDERR, attemptId, false);
assertTrue(log.contains(taskLog));
if (!isCleanup) {
@@ -127,12 +128,49 @@
assertTrue(log.contains(cleanupLog));
} else {
// validate tasklogs for cleanup attempt
- log = TestMiniMRMapRedDebugScript.readTaskLog(
+ log = readTaskLog(
TaskLog.LogName.STDERR, attemptId, true);
assertTrue(log.contains(cleanupLog));
}
}
+ /**
+ * Reads tasklog and returns it as string after trimming it.
+ * @param filter Task log filter; can be STDOUT, STDERR,
+ * SYSLOG, DEBUGOUT, DEBUGERR
+ * @param taskId The task id for which the log has to collected
+ * @param isCleanup whether the task is a cleanup attempt or not.
+ * @return task log as string
+ * @throws IOException
+ */
+ private String readTaskLog(TaskLog.LogName filter,
+ TaskAttemptID taskId,
+ boolean isCleanup)
+ throws IOException {
+ // string buffer to store task log
+ StringBuffer result = new StringBuffer();
+ int res;
+
+ // reads the whole tasklog into inputstream
+ InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1, isCleanup);
+ // construct string log from inputstream.
+ byte[] b = new byte[65536];
+ while (true) {
+ res = taskLogReader.read(b);
+ if (res > 0) {
+ result.append(new String(b));
+ } else {
+ break;
+ }
+ }
+ taskLogReader.close();
+
+ // trim the string and return it
+ String str = result.toString();
+ str = str.trim();
+ return str;
+ }
+
private void validateJob(RunningJob job, MiniMRCluster mr)
throws IOException {
assertEquals(JobStatus.SUCCEEDED, job.getJobState());
Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Tue Nov 17 21:27:34 2009
@@ -665,7 +665,8 @@
public void map(WritableComparable key, Writable value,
OutputCollector<WritableComparable, Writable> out, Reporter reporter)
throws IOException {
-
+ //NOTE- the next line is required for the TestDebugScript test to succeed
+ System.err.println("failing map");
throw new RuntimeException("failing map");
}
}