You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2009/11/17 22:27:35 UTC

svn commit: r881538 - in /hadoop/mapreduce/branches/branch-0.21: ./ src/c++/task-controller/ src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/

Author: ddas
Date: Tue Nov 17 21:27:34 2009
New Revision: 881538

URL: http://svn.apache.org/viewvc?rev=881538&view=rev
Log:
Merge -r 881535:881536 from trunk onto 0.21 branch. Fixes MAPREDUCE-915.

Added:
    hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java
      - copied unchanged from r881536, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java
    hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestDebugScriptWithLinuxTaskController.java
      - copied unchanged from r881536, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestDebugScriptWithLinuxTaskController.java
Modified:
    hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
    hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/main.c
    hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.c
    hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.h
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskController.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
    hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java
    hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java

Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Tue Nov 17 21:27:34 2009
@@ -821,3 +821,5 @@
 
     MAPREDUCE-1147. Add map output counters to new API. (Amar Kamat via
     cdouglas)
+
+    MAPREDUCE-915. The debug scripts are run as the job user. (ddas)

Modified: hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/c%2B%2B/task-controller/main.c?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/main.c (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/main.c Tue Nov 17 21:27:34 2009
@@ -135,6 +135,13 @@
     task_pid = argv[optind++];
     exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGKILL);
     break;
+  case RUN_DEBUG_SCRIPT:
+    tt_root = argv[optind++];
+    job_id = argv[optind++];
+    task_id = argv[optind++];
+    exit_code
+        = run_debug_script_as_user(user_detail->pw_name, job_id, task_id, tt_root);
+    break;
   default:
     exit_code = INVALID_COMMAND_PROVIDED;
   }

Modified: hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/c%2B%2B/task-controller/task-controller.c?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.c (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.c Tue Nov 17 21:27:34 2009
@@ -910,25 +910,41 @@
 }
 
 /*
- * Function used to launch a task as the provided user. It does the following :
+ * Function used to launch a task as the provided user.
+ */
+int run_task_as_user(const char * user, const char *jobid, const char *taskid,
+    const char *tt_root) {
+  return run_process_as_user(user, jobid, taskid, tt_root, LAUNCH_TASK_JVM);
+}
+
+/*
+ * Function that is used as a helper to launch task JVMs and debug scripts.
+ * Not meant for launching any other process. It does the following :
  * 1) Checks if the tt_root passed is found in mapreduce.cluster.local.dir
- * 2) Prepares attempt_dir and log_dir to be accessible by the child
+ * 2) Prepares attempt_dir and log_dir to be accessible by the task JVMs
  * 3) Uses get_task_launcher_file to fetch the task script file path
  * 4) Does an execlp on the same in order to replace the current image with
  * task image.
  */
-int run_task_as_user(const char * user, const char *jobid, const char *taskid,
-    const char *tt_root) {
-  int exit_code = 0;
-
+int run_process_as_user(const char * user, const char * jobid, 
+const char *taskid, const char *tt_root, int command) {
+  if (command != LAUNCH_TASK_JVM && command != RUN_DEBUG_SCRIPT) {
+    return INVALID_COMMAND_PROVIDED;
+  }
   if (jobid == NULL || taskid == NULL || tt_root == NULL) {
     return INVALID_ARGUMENT_NUMBER;
   }
+  
+  if (command == LAUNCH_TASK_JVM) {
+    fprintf(LOGFILE, "run_process_as_user launching a JVM for task :%s.\n", taskid);
+  } else if (command == RUN_DEBUG_SCRIPT) {
+    fprintf(LOGFILE, "run_process_as_user launching a debug script for task :%s.\n", taskid);
+  }
 
 #ifdef DEBUG
-  fprintf(LOGFILE, "Job-id passed to run_task_as_user : %s.\n", jobid);
-  fprintf(LOGFILE, "task-d passed to run_task_as_user : %s.\n", taskid);
-  fprintf(LOGFILE, "tt_root passed to run_task_as_user : %s.\n", tt_root);
+  fprintf(LOGFILE, "Job-id passed to run_process_as_user : %s.\n", jobid);
+  fprintf(LOGFILE, "task-d passed to run_process_as_user : %s.\n", taskid);
+  fprintf(LOGFILE, "tt_root passed to run_process_as_user : %s.\n", tt_root);
 #endif
 
   //Check tt_root before switching the user, as reading configuration
@@ -939,9 +955,11 @@
     return INVALID_TT_ROOT;
   }
 
+  int exit_code = 0;
   char *job_dir = NULL, *task_script_path = NULL;
 
-  if ((exit_code = initialize_task(jobid, taskid, user)) != 0) {
+  if (command == LAUNCH_TASK_JVM && 
+     (exit_code = initialize_task(jobid, taskid, user)) != 0) {
     fprintf(LOGFILE, "Couldn't initialise the task %s of user %s.\n", taskid,
         user);
     goto cleanup;
@@ -980,9 +998,14 @@
   cleanup();
   execlp(task_script_path, task_script_path, NULL);
   if (errno != 0) {
-    fprintf(LOGFILE, "Couldn't execute the task jvm file: %s", strerror(errno));
     free(task_script_path);
-    exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT;
+    if (command == LAUNCH_TASK_JVM) {
+      fprintf(LOGFILE, "Couldn't execute the task jvm file: %s", strerror(errno));
+      exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT;
+    } else if (command == RUN_DEBUG_SCRIPT) {
+      fprintf(LOGFILE, "Couldn't execute the task debug script file: %s", strerror(errno));
+      exit_code = UNABLE_TO_EXECUTE_DEBUG_SCRIPT;
+    }
   }
 
   return exit_code;
@@ -998,7 +1021,13 @@
   cleanup();
   return exit_code;
 }
-
+/*
+ * Function used to launch a debug script as the provided user. 
+ */
+int run_debug_script_as_user(const char * user, const char *jobid, const char *taskid,
+    const char *tt_root) {
+  return run_process_as_user(user, jobid, taskid, tt_root, RUN_DEBUG_SCRIPT);
+}
 /**
  * Function used to terminate/kill a task launched by the user.
  * The function sends appropriate signal to the process group

Modified: hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/c%2B%2B/task-controller/task-controller.h?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.h (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/c++/task-controller/task-controller.h Tue Nov 17 21:27:34 2009
@@ -44,6 +44,7 @@
   INITIALIZE_TASK,
   TERMINATE_TASK_JVM,
   KILL_TASK_JVM,
+  RUN_DEBUG_SCRIPT,
 };
 
 enum errorcodes {
@@ -67,6 +68,7 @@
   OUT_OF_MEMORY, //18
   INITIALIZE_DISTCACHE_FAILED, //19
   INITIALIZE_USER_FAILED, //20
+  UNABLE_TO_EXECUTE_DEBUG_SCRIPT, //21
 };
 
 #define USER_DIR_PATTERN "%s/taskTracker/%s"
@@ -99,6 +101,9 @@
 int run_task_as_user(const char * user, const char *jobid, const char *taskid,
     const char *tt_root);
 
+int run_debug_script_as_user(const char * user, const char *jobid, const char *taskid,
+    const char *tt_root);
+
 int initialize_user(const char *user);
 
 int initialize_task(const char *jobid, const char *taskid, const char *user);

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/DefaultTaskController.java?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/DefaultTaskController.java Tue Nov 17 21:27:34 2009
@@ -28,6 +28,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 
 /**
  * The default implementation for controlling tasks.
@@ -38,8 +39,8 @@
  * 
  * <br/>
  * 
- *  NOTE: This class is internal only class and not intended for users!!
  */
+@InterfaceAudience.Private
 public class DefaultTaskController extends TaskController {
 
   private static final Log LOG = 
@@ -142,4 +143,18 @@
     // Do nothing.
   }
   
+  @Override
+  void runDebugScript(DebugScriptContext context) throws IOException {
+    List<String>  wrappedCommand = TaskLog.captureDebugOut(context.args, 
+        context.stdout);
+    // run the script.
+    ShellCommandExecutor shexec = 
+      new ShellCommandExecutor(wrappedCommand.toArray(new String[0]), context.workDir);
+    shexec.execute();
+    int exitCode = shexec.getExitCode();
+    if (exitCode != 0) {
+      throw new IOException("Task debug script exit with nonzero status of " 
+          + exitCode + ".");
+    }
+  }
 }

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Tue Nov 17 21:27:34 2009
@@ -29,6 +29,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -87,6 +88,7 @@
     INITIALIZE_TASK,
     TERMINATE_TASK_JVM,
     KILL_TASK_JVM,
+    RUN_DEBUG_SCRIPT,
   }
 
   /**
@@ -119,10 +121,12 @@
     sb.append(cmdLine);
     // write the command to a file in the
     // task specific cache directory
-    writeCommand(sb.toString(), getTaskCacheDirectory(context));
+    writeCommand(sb.toString(), getTaskCacheDirectory(context, 
+        context.env.workDir));
     
     // Call the taskcontroller with the right parameters.
-    List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context);
+    List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context, 
+        context.env.workDir);
     ShellCommandExecutor shExec =  buildTaskControllerExecutor(
                                     TaskCommands.LAUNCH_TASK_JVM, 
                                     env.conf.getUser(),
@@ -149,7 +153,23 @@
       logOutput(shExec.getOutput());
     }
   }
-
+  
+  /**
+   * Launch the debug script process that will run as the owner of the job.
+   * 
+   * This method launches the task debug script process by executing a setuid
+   * executable that will switch to the user and run the task. 
+   */
+  @Override
+  void runDebugScript(DebugScriptContext context) throws IOException {
+    String debugOut = FileUtil.makeShellPath(context.stdout);
+    String cmdLine = TaskLog.buildDebugScriptCommandLine(context.args, debugOut);
+    writeCommand(cmdLine, getTaskCacheDirectory(context, context.workDir));
+    // Call the taskcontroller with the right parameters.
+    List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context, context.workDir);
+    runCommand(TaskCommands.RUN_DEBUG_SCRIPT, context.task.getUser(), 
+        launchTaskJVMArgs, context.workDir, null);
+  }
   /**
    * Helper method that runs a LinuxTaskController command
    * 
@@ -192,7 +212,7 @@
    * @param context
    * @return Argument to be used while launching Task VM
    */
-  private List<String> buildInitializeTaskArgs(TaskControllerContext context) {
+  private List<String> buildInitializeTaskArgs(TaskExecContext context) {
     List<String> commandArgs = new ArrayList<String>(3);
     String taskId = context.task.getTaskID().toString();
     String jobId = getJobId(context);
@@ -223,7 +243,7 @@
     }
   }
 
-  private String getJobId(TaskControllerContext context) {
+  private String getJobId(TaskExecContext context) {
     String taskId = context.task.getTaskID().toString();
     TaskAttemptID tId = TaskAttemptID.forName(taskId);
     String jobId = tId.getJobID().toString();
@@ -237,15 +257,16 @@
    * @param context
    * @return Argument to be used while launching Task VM
    */
-  private List<String> buildLaunchTaskArgs(TaskControllerContext context) {
+  private List<String> buildLaunchTaskArgs(TaskExecContext context, 
+      File workDir) {
     List<String> commandArgs = new ArrayList<String>(3);
     LOG.debug("getting the task directory as: " 
-        + getTaskCacheDirectory(context));
+        + getTaskCacheDirectory(context, workDir));
     LOG.debug("getting the tt_root as " +getDirectoryChosenForTask(
-        new File(getTaskCacheDirectory(context)), 
+        new File(getTaskCacheDirectory(context, workDir)), 
         context) );
     commandArgs.add(getDirectoryChosenForTask(
-        new File(getTaskCacheDirectory(context)), 
+        new File(getTaskCacheDirectory(context, workDir)), 
         context));
     commandArgs.addAll(buildInitializeTaskArgs(context));
     return commandArgs;
@@ -255,7 +276,7 @@
   // in Configs.LOCAL_DIR chosen for storing data pertaining to
   // this task.
   private String getDirectoryChosenForTask(File directory,
-      TaskControllerContext context) {
+      TaskExecContext context) {
     String jobId = getJobId(context);
     String taskId = context.task.getTaskID().toString();
     for (String dir : mapredLocalDirs) {
@@ -322,12 +343,13 @@
   }
   
   // Return the task specific directory under the cache.
-  private String getTaskCacheDirectory(TaskControllerContext context) {
+  private String getTaskCacheDirectory(TaskExecContext context, 
+      File workDir) {
     // In the case of JVM reuse, the task specific directory
     // is different from what is set with respect with
     // env.workDir. Hence building this from the taskId everytime.
     String taskId = context.task.getTaskID().toString();
-    File cacheDirForJob = context.env.workDir.getParentFile().getParentFile();
+    File cacheDirForJob = workDir.getParentFile().getParentFile();
     if(context.task.isTaskCleanupTask()) {
       taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX;
     }
@@ -345,6 +367,9 @@
     PrintWriter pw = null;
     String commandFile = directory + File.separator + COMMAND_FILE;
     LOG.info("Writing commands to " + commandFile);
+    LOG.info("--------Commands Begin--------");
+    LOG.info(cmdLine);
+    LOG.info("--------Commands End--------");
     try {
       FileWriter fw = new FileWriter(commandFile);
       BufferedWriter bw = new BufferedWriter(fw);

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskController.java?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskController.java Tue Nov 17 21:27:34 2009
@@ -19,6 +19,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,6 +30,7 @@
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.classification.InterfaceAudience;
 
 /**
  * Controls initialization, finalization and clean up of tasks, and
@@ -40,9 +42,8 @@
  * performing the actual actions.
  * 
  * <br/>
- * 
- * NOTE: This class is internal only class and not intended for users!!
  */
+@InterfaceAudience.Private
 public abstract class TaskController implements Configurable {
   
   private Configuration conf;
@@ -171,12 +172,14 @@
   abstract void initializeTask(TaskControllerContext context)
       throws IOException;
 
+  static class TaskExecContext {
+    // task being executed
+    Task task;
+  }
   /**
    * Contains task information required for the task controller.  
    */
-  static class TaskControllerContext {
-    // task being executed
-    Task task;
+  static class TaskControllerContext extends TaskExecContext {
     ShellCommandExecutor shExec;     // the Shell executor executing the JVM for this task.
 
     // Information used only when this context is used for launching new tasks.
@@ -199,6 +202,12 @@
   static class JobInitializationContext extends InitializationContext {
     JobID jobid;
   }
+  
+  static class DebugScriptContext extends TaskExecContext {
+    List<String> args;
+    File workDir;
+    File stdout;
+  }
 
   /**
    * Sends a graceful terminate signal to taskJVM and it sub-processes. 
@@ -223,4 +232,14 @@
    */
   public abstract void initializeUser(InitializationContext context)
       throws IOException;
+  
+  /**
+   * Launch the task debug script
+   * 
+   * @param context
+   * @throws IOException
+   */
+  abstract void runDebugScript(DebugScriptContext context) 
+      throws IOException;
+  
 }

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskLog.java Tue Nov 17 21:27:34 2009
@@ -561,6 +561,37 @@
   }
   
   /**
+   * Construct the command line for running the debug script
+   * @param cmd The command and the arguments that should be run
+   * @param stdoutFilename The filename that stdout should be saved to
+   * @param stderrFilename The filename that stderr should be saved to
+   * @param tailLength The length of the tail to be saved.
+   * @return the command line as a String
+   * @throws IOException
+   */
+  static String buildDebugScriptCommandLine(List<String> cmd, String debugout)
+  throws IOException {
+    StringBuilder mergedCmd = new StringBuilder();
+    mergedCmd.append("exec ");
+    boolean isExecutable = true;
+    for(String s: cmd) {
+      if (isExecutable) {
+        // the executable name needs to be expressed as a shell path for the  
+        // shell to find it.
+        mergedCmd.append(FileUtil.makeShellPath(new File(s)));
+        isExecutable = false; 
+      } else {
+        mergedCmd.append(s);
+      }
+      mergedCmd.append(" ");
+    }
+    mergedCmd.append(" < /dev/null ");
+    mergedCmd.append(" >");
+    mergedCmd.append(debugout);
+    mergedCmd.append(" 2>&1 ");
+    return mergedCmd.toString();
+  }
+  /**
    * Add quotes to each of the command strings and
    * return as a single string 
    * @param cmd The command to be quoted
@@ -604,25 +635,7 @@
     List<String> result = new ArrayList<String>(3);
     result.add(bashCommand);
     result.add("-c");
-    StringBuffer mergedCmd = new StringBuffer();
-    mergedCmd.append("exec ");
-    boolean isExecutable = true;
-    for(String s: cmd) {
-      if (isExecutable) {
-        // the executable name needs to be expressed as a shell path for the  
-        // shell to find it.
-        mergedCmd.append(FileUtil.makeShellPath(new File(s)));
-        isExecutable = false; 
-      } else {
-        mergedCmd.append(s);
-      }
-      mergedCmd.append(" ");
-    }
-    mergedCmd.append(" < /dev/null ");
-    mergedCmd.append(" >");
-    mergedCmd.append(debugout);
-    mergedCmd.append(" 2>&1 ");
-    result.add(mergedCmd.toString());
+    result.add(buildDebugScriptCommandLine(cmd, debugout));
     return result;
   }
   

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Nov 17 21:27:34 2009
@@ -65,6 +65,7 @@
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.TaskController.DebugScriptContext;
 import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapred.pipes.Submitter;
@@ -2255,84 +2256,7 @@
             setTaskFailState(true);
             // call the script here for the failed tasks.
             if (debugCommand != null) {
-              String taskStdout ="";
-              String taskStderr ="";
-              String taskSyslog ="";
-              String jobConf = task.getJobFile();
-              try {
-                // get task's stdout file 
-                taskStdout = FileUtil.makeShellPath(
-                    TaskLog.getRealTaskLogFileLocation
-                                  (task.getTaskID(), TaskLog.LogName.STDOUT));
-                // get task's stderr file 
-                taskStderr = FileUtil.makeShellPath(
-                    TaskLog.getRealTaskLogFileLocation
-                                  (task.getTaskID(), TaskLog.LogName.STDERR));
-                // get task's syslog file 
-                taskSyslog = FileUtil.makeShellPath(
-                    TaskLog.getRealTaskLogFileLocation
-                                  (task.getTaskID(), TaskLog.LogName.SYSLOG));
-              } catch(IOException e){
-                LOG.warn("Exception finding task's stdout/err/syslog files");
-              }
-              File workDir = null;
-              try {
-                workDir =
-                    new File(lDirAlloc.getLocalPathToRead(
-                        TaskTracker.getLocalTaskDir(task.getUser(), task
-                            .getJobID().toString(), task.getTaskID()
-                            .toString(), task.isTaskCleanupTask())
-                            + Path.SEPARATOR + MRConstants.WORKDIR,
-                        localJobConf).toString());
-              } catch (IOException e) {
-                LOG.warn("Working Directory of the task " + task.getTaskID() +
-                                " doesnt exist. Caught exception " +
-                          StringUtils.stringifyException(e));
-              }
-              // Build the command  
-              File stdout = TaskLog.getRealTaskLogFileLocation(
-                                   task.getTaskID(), TaskLog.LogName.DEBUGOUT);
-              // add pipes program as argument if it exists.
-              String program ="";
-              String executable = Submitter.getExecutable(localJobConf);
-              if ( executable != null) {
-            	try {
-            	  program = new URI(executable).getFragment();
-            	} catch (URISyntaxException ur) {
-            	  LOG.warn("Problem in the URI fragment for pipes executable");
-            	}	  
-              }
-              String [] debug = debugCommand.split(" ");
-              Vector<String> vargs = new Vector<String>();
-              for (String component : debug) {
-                vargs.add(component);
-              }
-              vargs.add(taskStdout);
-              vargs.add(taskStderr);
-              vargs.add(taskSyslog);
-              vargs.add(jobConf);
-              vargs.add(program);
-              try {
-                List<String>  wrappedCommand = TaskLog.captureDebugOut
-                                                          (vargs, stdout);
-                // run the script.
-                try {
-                  runScript(wrappedCommand, workDir);
-                } catch (IOException ioe) {
-                  LOG.warn("runScript failed with: " + StringUtils.
-                                                      stringifyException(ioe));
-                }
-              } catch(IOException e) {
-                LOG.warn("Error in preparing wrapped debug command");
-              }
-
-              // add all lines of debug out to diagnostics
-              try {
-                int num = localJobConf.getInt(JobContext.TASK_DEBUGOUT_LINES, -1);
-                addDiagnostics(FileUtil.makeShellPath(stdout),num,"DEBUG OUT");
-              } catch(IOException ioe) {
-                LOG.warn("Exception in add diagnostics!");
-              }
+              runDebugScript();
             }
           }
           taskStatus.setProgress(0.0f);
@@ -2360,21 +2284,84 @@
 
     }
     
-
-    /**
-     * Runs the script given in args
-     * @param args script name followed by its argumnets
-     * @param dir current working directory.
-     * @throws IOException
-     */
-    public void runScript(List<String> args, File dir) throws IOException {
-      ShellCommandExecutor shexec = 
-              new ShellCommandExecutor(args.toArray(new String[0]), dir);
-      shexec.execute();
-      int exitCode = shexec.getExitCode();
-      if (exitCode != 0) {
-        throw new IOException("Task debug script exit with nonzero status of " 
-                              + exitCode + ".");
+    private void runDebugScript() {
+      String taskStdout ="";
+      String taskStderr ="";
+      String taskSyslog ="";
+      String jobConf = task.getJobFile();
+      try {
+        // get task's stdout file 
+        taskStdout = FileUtil.makeShellPath(
+            TaskLog.getRealTaskLogFileLocation
+                          (task.getTaskID(), TaskLog.LogName.STDOUT));
+        // get task's stderr file 
+        taskStderr = FileUtil.makeShellPath(
+            TaskLog.getRealTaskLogFileLocation
+                          (task.getTaskID(), TaskLog.LogName.STDERR));
+        // get task's syslog file 
+        taskSyslog = FileUtil.makeShellPath(
+            TaskLog.getRealTaskLogFileLocation
+                          (task.getTaskID(), TaskLog.LogName.SYSLOG));
+      } catch(IOException e){
+        LOG.warn("Exception finding task's stdout/err/syslog files");
+      }
+      File workDir = null;
+      try {
+        workDir =
+            new File(lDirAlloc.getLocalPathToRead(
+                TaskTracker.getLocalTaskDir(task.getUser(), task
+                    .getJobID().toString(), task.getTaskID()
+                    .toString(), task.isTaskCleanupTask())
+                    + Path.SEPARATOR + MRConstants.WORKDIR,
+                localJobConf).toString());
+      } catch (IOException e) {
+        LOG.warn("Working Directory of the task " + task.getTaskID() +
+                        " doesnt exist. Caught exception " +
+                  StringUtils.stringifyException(e));
+      }
+      // Build the command  
+      File stdout = TaskLog.getRealTaskLogFileLocation(
+                           task.getTaskID(), TaskLog.LogName.DEBUGOUT);
+      // add pipes program as argument if it exists.
+      String program ="";
+      String executable = Submitter.getExecutable(localJobConf);
+      if ( executable != null) {
+        try {
+          program = new URI(executable).getFragment();
+        } catch (URISyntaxException ur) {
+          LOG.warn("Problem in the URI fragment for pipes executable");
+        }     
+      }
+      String [] debug = debugCommand.split(" ");
+      List<String> vargs = new ArrayList<String>();
+      for (String component : debug) {
+        vargs.add(component);
+      }
+      vargs.add(taskStdout);
+      vargs.add(taskStderr);
+      vargs.add(taskSyslog);
+      vargs.add(jobConf);
+      vargs.add(program);
+      DebugScriptContext context = 
+        new TaskController.DebugScriptContext();
+      context.args = vargs;
+      context.stdout = stdout;
+      context.workDir = workDir;
+      context.task = task;
+      try {
+        getTaskController().runDebugScript(context);
+        // add all lines of debug out to diagnostics
+        try {
+          int num = localJobConf.getInt(JobContext.TASK_DEBUGOUT_LINES,
+              -1);
+          addDiagnostics(FileUtil.makeShellPath(stdout),num,
+              "DEBUG OUT");
+        } catch(IOException ioe) {
+          LOG.warn("Exception in add diagnostics!");
+        }
+      } catch (IOException ie) {
+        LOG.warn("runDebugScript failed with: " + StringUtils.
+                                              stringifyException(ie));
       }
     }
 

Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java Tue Nov 17 21:27:34 2009
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.hadoop.mapreduce.TaskType;
-
-/**
- * Class to test mapred debug Script
- */
-public class TestMiniMRMapRedDebugScript extends TestCase {
-  private static final Log LOG =
-    LogFactory.getLog(TestMiniMRMapRedDebugScript.class.getName());
-
-  private MiniMRCluster mr;
-  private MiniDFSCluster dfs;
-  private FileSystem fileSys;
-  
-  /**
-   * Fail map class 
-   */
-  public static class MapClass extends MapReduceBase
-  implements Mapper<LongWritable, Text, Text, IntWritable> {
-     public void map (LongWritable key, Text value, 
-                     OutputCollector<Text, IntWritable> output, 
-                     Reporter reporter) throws IOException {
-       System.err.println("Bailing out");
-       throw new IOException();
-     }
-  }
-
-  /**
-   * Reads tasklog and returns it as string after trimming it.
-   * @param filter Task log filter; can be STDOUT, STDERR,
-   *                SYSLOG, DEBUGOUT, DEBUGERR
-   * @param taskId The task id for which the log has to collected
-   * @param isCleanup whether the task is a cleanup attempt or not.
-   * @return task log as string
-   * @throws IOException
-   */
-  public static String readTaskLog(TaskLog.LogName  filter, 
-                                   TaskAttemptID taskId, 
-                                   boolean isCleanup)
-  throws IOException {
-    // string buffer to store task log
-    StringBuffer result = new StringBuffer();
-    int res;
-
-    // reads the whole tasklog into inputstream
-    InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1, isCleanup);
-    // construct string log from inputstream.
-    byte[] b = new byte[65536];
-    while (true) {
-      res = taskLogReader.read(b);
-      if (res > 0) {
-        result.append(new String(b));
-      } else {
-        break;
-      }
-    }
-    taskLogReader.close();
-    
-    // trim the string and return it
-    String str = result.toString();
-    str = str.trim();
-    return str;
-  }
-
-  /**
-   * Launches failed map task and debugs the failed task
-   * @param conf configuration for the mapred job
-   * @param inDir input path
-   * @param outDir output path
-   * @param debugDir debug directory where script is present
-   * @param debugCommand The command to execute script
-   * @param input Input text
-   * @return the output of debug script 
-   * @throws IOException
-   */
-  public String launchFailMapAndDebug(JobConf conf,
-                                      Path inDir,
-                                      Path outDir,
-                                      Path debugDir,
-                                      String debugScript,
-                                      String input)
-  throws IOException {
-
-    // set up the input file system and write input text.
-    FileSystem inFs = inDir.getFileSystem(conf);
-    FileSystem outFs = outDir.getFileSystem(conf);
-    outFs.delete(outDir, true);
-    if (!inFs.mkdirs(inDir)) {
-      throw new IOException("Mkdirs failed to create " + inDir.toString());
-    }
-    {
-      // write input into input file
-      DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
-      file.writeBytes(input);
-      file.close();
-    }
-
-    // configure the mapred Job for failing map task.
-    conf.setJobName("failmap");
-    conf.setMapperClass(MapClass.class);        
-    conf.setReducerClass(IdentityReducer.class);
-    conf.setNumMapTasks(1);
-    conf.setNumReduceTasks(0);
-    conf.setMapDebugScript(debugScript);
-    FileInputFormat.setInputPaths(conf, inDir);
-    FileOutputFormat.setOutputPath(conf, outDir);
-    String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
-                                      "/tmp")).toString().replace(' ', '+');
-    conf.set("test.build.data", TEST_ROOT_DIR);
-
-    // copy debug script to cache from local file system.
-    FileSystem debugFs = debugDir.getFileSystem(conf);
-    Path scriptPath = new Path(debugDir,"testscript.txt");
-    Path cachePath = new Path("/cacheDir");
-    if (!debugFs.mkdirs(cachePath)) {
-      throw new IOException("Mkdirs failed to create " + cachePath.toString());
-    }
-    debugFs.copyFromLocalFile(scriptPath,cachePath);
-    
-    URI uri = debugFs.getUri().resolve(cachePath+"/testscript.txt#testscript");
-    DistributedCache.createSymlink(conf);
-    DistributedCache.addCacheFile(uri, conf);
-
-    RunningJob job =null;
-    // run the job. It will fail with IOException.
-    try {
-      job = new JobClient(conf).submitJob(conf);
-    } catch (IOException e) {
-    	LOG.info("Running Job failed", e);
-    }
-
-    JobID jobId = job.getID();
-    // construct the task id of first map task of failmap
-    TaskAttemptID taskId = new TaskAttemptID(
-        new TaskID(jobId,TaskType.MAP, 0), 0);
-    // wait for the job to finish.
-    while (!job.isComplete()) ;
-    
-    // return the output of debugout log.
-    return readTaskLog(TaskLog.LogName.DEBUGOUT,taskId, false);
-  }
-
-  /**
-   * Tests Map task's debug script
-   * 
-   * In this test, we launch a mapreduce program which 
-   * writes 'Bailing out' to stderr and throws an exception.
-   * We will run the script when tsk fails and validate 
-   * the output of debug out log. 
-   *
-   */
-  public void testMapDebugScript() throws Exception {
-    try {
-      
-      // create configuration, dfs, file system and mapred cluster 
-      Configuration cnf = new Configuration();
-      dfs = new MiniDFSCluster(cnf, 1, true, null);
-      fileSys = dfs.getFileSystem();
-      mr = new MiniMRCluster(2, fileSys.getUri().toString(), 1);
-      JobConf conf = mr.createJobConf();
-      
-      // intialize input, output and debug directories
-      final Path debugDir = new Path("build/test/debug");
-      Path inDir = new Path("testing/wc/input");
-      Path outDir = new Path("testing/wc/output");
-      
-      // initialize debug command and input text
-      String debugScript = "./testscript";
-      String input = "The input";
-      
-      // Launch failed map task and run debug script
-      String result = launchFailMapAndDebug(conf,inDir, 
-                               outDir,debugDir, debugScript, input);
-      
-      // Assert the output of debug script.
-      assertTrue(result.contains("Test Script\nBailing out"));
-
-    } finally {  
-      // close file system and shut down dfs and mapred cluster
-      try {
-        if (fileSys != null) {
-          fileSys.close();
-        }
-        if (dfs != null) {
-          dfs.shutdown();
-        }
-        if (mr != null) {
-          mr.shutdown();
-        }
-      } catch (IOException ioe) {
-        LOG.info("IO exception in closing file system:"+ioe.getMessage(), ioe);
-      }
-    }
-  }
-
-  public static void main(String args[]){
-    TestMiniMRMapRedDebugScript tmds = new TestMiniMRMapRedDebugScript();
-    try {
-      tmds.testMapDebugScript();
-    } catch (Exception e) {
-      LOG.error("Exception in test: "+e.getMessage(), e);
-    }
-  }
-  
-}
-

Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java Tue Nov 17 21:27:34 2009
@@ -19,6 +19,7 @@
 
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 
 import junit.framework.TestCase;
 
@@ -118,7 +119,7 @@
     assertTrue(ts != null);
     assertEquals(TaskStatus.State.FAILED, ts.getRunState());
     // validate tasklogs for task attempt
-    String log = TestMiniMRMapRedDebugScript.readTaskLog(
+    String log = readTaskLog(
                       TaskLog.LogName.STDERR, attemptId, false);
     assertTrue(log.contains(taskLog));
     if (!isCleanup) {
@@ -127,12 +128,49 @@
       assertTrue(log.contains(cleanupLog));
     } else {
       // validate tasklogs for cleanup attempt
-      log = TestMiniMRMapRedDebugScript.readTaskLog(
+      log = readTaskLog(
                  TaskLog.LogName.STDERR, attemptId, true);
       assertTrue(log.contains(cleanupLog));
     }
   }
 
+  /**
+   * Reads tasklog and returns it as string after trimming it.
+   * @param filter Task log filter; can be STDOUT, STDERR,
+   *                SYSLOG, DEBUGOUT, DEBUGERR
+   * @param taskId The task id for which the log has to collected
+   * @param isCleanup whether the task is a cleanup attempt or not.
+   * @return task log as string
+   * @throws IOException
+   */
+  private String readTaskLog(TaskLog.LogName  filter, 
+                                   TaskAttemptID taskId, 
+                                   boolean isCleanup)
+  throws IOException {
+    // string buffer to store task log
+    StringBuffer result = new StringBuffer();
+    int res;
+
+    // reads the whole tasklog into inputstream
+    InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1, isCleanup);
+    // construct string log from inputstream.
+    byte[] b = new byte[65536];
+    while (true) {
+      res = taskLogReader.read(b);
+      if (res > 0) {
+        result.append(new String(b));
+      } else {
+        break;
+      }
+    }
+    taskLogReader.close();
+    
+    // trim the string and return it
+    String str = result.toString();
+    str = str.trim();
+    return str;
+  }
+  
   private void validateJob(RunningJob job, MiniMRCluster mr) 
   throws IOException {
     assertEquals(JobStatus.SUCCEEDED, job.getJobState());

Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=881538&r1=881537&r2=881538&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Tue Nov 17 21:27:34 2009
@@ -665,7 +665,8 @@
     public void map(WritableComparable key, Writable value,
         OutputCollector<WritableComparable, Writable> out, Reporter reporter)
         throws IOException {
-
+      //NOTE- the next line is required for the TestDebugScript test to succeed
+      System.err.println("failing map");
       throw new RuntimeException("failing map");
     }
   }