You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/10/07 08:59:45 UTC

svn commit: r702361 [2/2] - in /hadoop/core/branches/branch-0.19: ./ docs/ src/docs/src/documentation/content/xdocs/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/webapps/job/

Modified: hadoop/core/branches/branch-0.19/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original)
+++ hadoop/core/branches/branch-0.19/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Mon Oct  6 23:59:44 2008
@@ -1594,13 +1594,11 @@
           <li>
             Setup the job during initialization. For example, create
             the temporary output directory for the job during the
-            initialization of the job. The job client does the setup
-            for the job.
+            initialization of the job. 
           </li>
           <li>
             Cleanup the job after the job completion. For example, remove the
-            temporary output directory after the job completion. A separate 
-            task does the cleanupJob at the end of the job.
+            temporary output directory after the job completion.
           </li>
           <li>
             Setup the task temporary output.

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java Mon Oct  6 23:59:44 2008
@@ -58,7 +58,7 @@
           LOG.error("Job initialization failed:\n" +
                     StringUtils.stringifyException(t));
           if (job != null) {
-            job.fail();
+            job.terminateJob(JobStatus.FAILED);
           }
         }
       }

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/HistoryViewer.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/HistoryViewer.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/HistoryViewer.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/HistoryViewer.java Mon Oct  6 23:59:44 2008
@@ -93,6 +93,8 @@
     printJobDetails();
     printTaskSummary();
     printJobAnalysis();
+    printTasks("SETUP", "FAILED");
+    printTasks("SETUP", "KILLED");
     printTasks("MAP", "FAILED");
     printTasks("MAP", "KILLED");
     printTasks("REDUCE", "FAILED");
@@ -100,9 +102,11 @@
     printTasks("CLEANUP", "FAILED");
     printTasks("CLEANUP", "KILLED");
     if (printAll) {
+      printTasks("SETUP", "SUCCESS");
       printTasks("MAP", "SUCCESS");
       printTasks("REDUCE", "SUCCESS");
       printTasks("CLEANUP", "SUCCESS");
+      printAllTaskAttempts("SETUP");
       printAllTaskAttempts("MAP");
       printAllTaskAttempts("REDUCE");
       printAllTaskAttempts("CLEANUP");
@@ -219,6 +223,7 @@
     int totalMaps = 0; 
     int totalReduces = 0; 
     int totalCleanups = 0;
+    int totalSetups = 0;
     int numFailedMaps = 0; 
     int numKilledMaps = 0;
     int numFailedReduces = 0; 
@@ -226,12 +231,17 @@
     int numFinishedCleanups = 0;
     int numFailedCleanups = 0;
     int numKilledCleanups = 0;
+    int numFinishedSetups = 0;
+    int numFailedSetups = 0;
+    int numKilledSetups = 0;
     long mapStarted = 0; 
     long mapFinished = 0; 
     long reduceStarted = 0; 
     long reduceFinished = 0; 
     long cleanupStarted = 0;
     long cleanupFinished = 0;
+    long setupStarted = 0;
+    long setupFinished = 0;
 
     Map <String, String> allHosts = new TreeMap<String, String>();
 
@@ -286,6 +296,23 @@
                                             attempt.get(Keys.TASK_STATUS))) {
             numKilledCleanups++;
           }
+        } else if (Values.SETUP.name().equals(task.get(Keys.TASK_TYPE))){
+          if (setupStarted==0||setupStarted > startTime) {
+            setupStarted = startTime; 
+          }
+          if (setupFinished < finishTime) {
+            setupFinished = finishTime; 
+          }
+          totalSetups++; 
+          if (Values.SUCCESS.name().equals(attempt.get(Keys.TASK_STATUS))) {
+            numFinishedSetups++;
+          } else if (Values.FAILED.name().equals(
+                                            attempt.get(Keys.TASK_STATUS))) {
+            numFailedSetups++;
+          } else if (Values.KILLED.name().equals(
+                                            attempt.get(Keys.TASK_STATUS))) {
+            numKilledSetups++;
+          }
         }
       }
     }
@@ -296,6 +323,14 @@
     taskSummary.append("\nKind\tTotal\t");
     taskSummary.append("Successful\tFailed\tKilled\tStartTime\tFinishTime");
     taskSummary.append("\n");
+    taskSummary.append("\nSetup\t").append(totalSetups);
+    taskSummary.append("\t").append(numFinishedSetups);
+    taskSummary.append("\t\t").append(numFailedSetups);
+    taskSummary.append("\t").append(numKilledSetups);
+    taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+                               dateFormat, setupStarted, 0));
+    taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff(
+                               dateFormat, setupFinished, setupStarted)); 
     taskSummary.append("\nMap\t").append(totalMaps);
     taskSummary.append("\t").append(job.getInt(Keys.FINISHED_MAPS));
     taskSummary.append("\t\t").append(numFailedMaps);

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobClient.java Mon Oct  6 23:59:44 2008
@@ -262,6 +262,15 @@
     }
 
     /**
+     * A float between 0.0 and 1.0, indicating the % of setup work
+     * completed.
+     */
+    public float setupProgress() throws IOException {
+      ensureFreshStatus();
+      return status.setupProgress();
+    }
+
+    /**
      * Returns immediately whether the whole job is done yet or not.
      */
     public synchronized boolean isComplete() throws IOException {
@@ -813,13 +822,6 @@
       out.close();
     }
 
-    // skip doing setup if there are no maps for the job.
-    // because if there are no maps, job is considered completed and successful
-    if (splits.length != 0) {
-      // do setupJob
-      job.getOutputCommitter().setupJob(new JobContext(job));
-    }
-
     //
     // Now, actually submit the job (using the submit name)
     //
@@ -1039,7 +1041,18 @@
   public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
     return jobSubmitClient.getCleanupTaskReports(jobId);
   }
-  
+
+  /**
+   * Get the information of the current state of the setup tasks of a job.
+   * 
+   * @param jobId the job to query.
+   * @return the list of all of the setup tips.
+   * @throws IOException
+   */    
+  public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
+    return jobSubmitClient.getSetupTaskReports(jobId);
+  }
+
   /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
   @Deprecated
   public TaskReport[] getReduceTaskReports(String jobId) throws IOException {

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobHistory.java Mon Oct  6 23:59:44 2008
@@ -123,7 +123,7 @@
    * most places in history file. 
    */
   public static enum Values {
-    SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP, RUNNING
+    SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP, RUNNING, PREP, SETUP
   }
 
   // temp buffer for parsed dataa
@@ -923,12 +923,14 @@
     }
     /**
      * Logs launch time of job. 
+     * 
      * @param jobId job id, assigned by jobtracker. 
      * @param startTime start time of job. 
      * @param totalMaps total maps assigned by jobtracker. 
      * @param totalReduces total reduces. 
      */
-    public static void logStarted(JobID jobId, long startTime, int totalMaps, int totalReduces){
+    public static void logInited(JobID jobId, long startTime, 
+                                 int totalMaps, int totalReduces) {
       if (!disableHistory){
         String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
         ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
@@ -940,10 +942,45 @@
               new String[] {jobId.toString(), String.valueOf(startTime), 
                             String.valueOf(totalMaps), 
                             String.valueOf(totalReduces), 
+                            Values.PREP.name()}); 
+        }
+      }
+    }
+    
+   /**
+     * Logs the job as RUNNING. 
+     *
+     * @param jobId job id, assigned by jobtracker. 
+     * @param startTime start time of job. 
+     * @param totalMaps total maps assigned by jobtracker. 
+     * @param totalReduces total reduces. 
+     * @deprecated Use {@link #logInited(JobID, long, int, int)} and 
+     * {@link #logStarted(JobID)}
+     */
+    @Deprecated
+    public static void logStarted(JobID jobId, long startTime, 
+                                  int totalMaps, int totalReduces) {
+      logStarted(jobId);
+    }
+    
+    /**
+     * Logs job as running 
+     * @param jobId job id, assigned by jobtracker. 
+     */
+    public static void logStarted(JobID jobId){
+      if (!disableHistory){
+        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
+        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+
+        if (null != writer){
+          JobHistory.log(writer, RecordTypes.Job, 
+              new Keys[] {Keys.JOBID, Keys.JOB_STATUS},
+              new String[] {jobId.toString(),  
                             Values.RUNNING.name()}); 
         }
       }
     }
+    
     /**
      * Log job finished. closes the job file in history. 
      * @param jobId job id, assigned by jobtracker. 
@@ -1197,12 +1234,11 @@
      * @param startTime start time of task attempt as reported by task tracker. 
      * @param hostName host name of the task attempt. 
      * @deprecated Use 
-     *             {@link #logStarted(TaskAttemptID, long, String, int, 
-     *                                boolean)}
+     *             {@link #logStarted(TaskAttemptID, long, String, int, String)}
      */
     @Deprecated
     public static void logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName){
-      logStarted(taskAttemptId, startTime, hostName, -1, false);
+      logStarted(taskAttemptId, startTime, hostName, -1, Values.MAP.name());
     }
     
     /**
@@ -1212,11 +1248,11 @@
      * @param startTime start time of task attempt as reported by task tracker. 
      * @param trackerName name of the tracker executing the task attempt.
      * @param httpPort http port of the task tracker executing the task attempt
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or map 
      */
     public static void logStarted(TaskAttemptID taskAttemptId, long startTime,
                                   String trackerName, int httpPort, 
-                                  boolean isCleanup){
+                                  String taskType) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
@@ -1226,8 +1262,7 @@
                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
                                      Keys.TASK_ATTEMPT_ID, Keys.START_TIME, 
                                      Keys.TRACKER_NAME, Keys.HTTP_PORT},
-                         new String[]{isCleanup ? Values.CLEANUP.name() : 
-                                                  Values.MAP.name(),
+                         new String[]{taskType,
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.toString(), 
                                       String.valueOf(startTime), trackerName,
@@ -1242,13 +1277,12 @@
      * @param finishTime finish time
      * @param hostName host name 
      * @deprecated Use 
-     * {@link #logFinished(TaskAttemptID, long, String, boolean, String, 
-     *                     Counters)}
+     * {@link #logFinished(TaskAttemptID, long, String, String, String, Counters)}
      */
     @Deprecated
     public static void logFinished(TaskAttemptID taskAttemptId, long finishTime, 
                                    String hostName){
-      logFinished(taskAttemptId, finishTime, hostName, false, "", 
+      logFinished(taskAttemptId, finishTime, hostName, Values.MAP.name(), "", 
                   new Counters());
     }
 
@@ -1258,14 +1292,15 @@
      * @param taskAttemptId task attempt id 
      * @param finishTime finish time
      * @param hostName host name 
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or map 
      * @param stateString state string of the task attempt
      * @param counter counters of the task attempt
      */
     public static void logFinished(TaskAttemptID taskAttemptId, 
                                    long finishTime, 
                                    String hostName,
-                                   boolean isCleanup, String stateString, 
+                                   String taskType,
+                                   String stateString, 
                                    Counters counter) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
@@ -1277,8 +1312,7 @@
                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                      Keys.FINISH_TIME, Keys.HOSTNAME, 
                                      Keys.STATE_STRING, Keys.COUNTERS},
-                         new String[]{isCleanup ? Values.CLEANUP.name() : 
-                                                  Values.MAP.name(), 
+                         new String[]{taskType, 
                                       taskAttemptId.getTaskID().toString(),
                                       taskAttemptId.toString(), 
                                       Values.SUCCESS.name(),  
@@ -1296,13 +1330,13 @@
      * @param hostName hostname of this task attempt.
      * @param error error message if any for this task attempt.
      * @deprecated Use
-     * {@link #logFailed(TaskAttemptID, long, String, String, boolean)} 
+     * {@link #logFailed(TaskAttemptID, long, String, String, String)} 
      */
     @Deprecated
     public static void logFailed(TaskAttemptID taskAttemptId, 
                                  long timestamp, String hostName, 
                                  String error) {
-      logFailed(taskAttemptId, timestamp, hostName, error, false);
+      logFailed(taskAttemptId, timestamp, hostName, error, Values.MAP.name());
     }
 
     /**
@@ -1312,11 +1346,11 @@
      * @param timestamp timestamp
      * @param hostName hostname of this task attempt.
      * @param error error message if any for this task attempt. 
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or map 
      */
     public static void logFailed(TaskAttemptID taskAttemptId, 
                                  long timestamp, String hostName, 
-                                 String error, boolean isCleanup) {
+                                 String error, String taskType) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
@@ -1326,8 +1360,7 @@
                          new Keys[]{Keys.TASK_TYPE, Keys.TASKID, 
                                     Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                     Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
-                         new String[]{ isCleanup ? Values.CLEANUP.name() :
-                                                   Values.MAP.name(), 
+                         new String[]{ taskType, 
                                        taskAttemptId.getTaskID().toString(),
                                        taskAttemptId.toString(), 
                                        Values.FAILED.name(),
@@ -1344,12 +1377,12 @@
      * @param hostName hostname of this task attempt.
      * @param error error message if any for this task attempt. 
      * @deprecated Use 
-     * {@link #logKilled(TaskAttemptID, long, String, String, boolean)}
+     * {@link #logKilled(TaskAttemptID, long, String, String, String)}
      */
     @Deprecated
     public static void logKilled(TaskAttemptID taskAttemptId, 
                                  long timestamp, String hostName, String error){
-      logKilled(taskAttemptId, timestamp, hostName, error, false);
+      logKilled(taskAttemptId, timestamp, hostName, error, Values.MAP.name());
     } 
     
     /**
@@ -1359,11 +1392,11 @@
      * @param timestamp timestamp
      * @param hostName hostname of this task attempt.
      * @param error error message if any for this task attempt. 
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or map 
      */
     public static void logKilled(TaskAttemptID taskAttemptId, 
                                  long timestamp, String hostName,
-                                 String error, boolean isCleanup){
+                                 String error, String taskType) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
@@ -1374,8 +1407,7 @@
                                     Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                     Keys.FINISH_TIME, Keys.HOSTNAME,
                                     Keys.ERROR},
-                         new String[]{ isCleanup ? Values.CLEANUP.name() : 
-                                                   Values.MAP.name(), 
+                         new String[]{ taskType, 
                                        taskAttemptId.getTaskID().toString(), 
                                        taskAttemptId.toString(),
                                        Values.KILLED.name(),
@@ -1396,12 +1428,12 @@
      * @param startTime start time
      * @param hostName host name 
      * @deprecated Use 
-     * {@link #logStarted(TaskAttemptID, long, String, int, boolean)}
+     * {@link #logStarted(TaskAttemptID, long, String, int, String)}
      */
     @Deprecated
     public static void logStarted(TaskAttemptID taskAttemptId, 
                                   long startTime, String hostName){
-      logStarted(taskAttemptId, startTime, hostName, -1, false);
+      logStarted(taskAttemptId, startTime, hostName, -1, Values.REDUCE.name());
     }
     
     /**
@@ -1411,11 +1443,12 @@
      * @param startTime start time
      * @param trackerName tracker name 
      * @param httpPort the http port of the tracker executing the task attempt
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or reduce 
      */
     public static void logStarted(TaskAttemptID taskAttemptId, 
                                   long startTime, String trackerName, 
-                                  int httpPort, boolean isCleanup) {
+                                  int httpPort, 
+                                  String taskType) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
@@ -1425,8 +1458,7 @@
                          new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
                                       Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
                                       Keys.TRACKER_NAME, Keys.HTTP_PORT},
-                         new String[]{isCleanup ? Values.CLEANUP.name() : 
-                                                  Values.REDUCE.name(),
+                         new String[]{taskType,
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.toString(), 
                                       String.valueOf(startTime), trackerName,
@@ -1443,15 +1475,15 @@
      * @param finishTime finish time of task
      * @param hostName host name where task attempt executed
      * @deprecated Use 
-     * {@link #logFinished(TaskAttemptID, long, long, long, String, boolean, 
-     *                     String, Counters)}
+     * {@link #logFinished(TaskAttemptID, long, long, long, String, String, String, Counters)}
      */
     @Deprecated
     public static void logFinished(TaskAttemptID taskAttemptId, long shuffleFinished, 
                                    long sortFinished, long finishTime, 
                                    String hostName){
       logFinished(taskAttemptId, shuffleFinished, sortFinished, 
-                  finishTime, hostName, false, "", new Counters());
+                  finishTime, hostName, Values.REDUCE.name(),
+                  "", new Counters());
     }
     
     /**
@@ -1462,14 +1494,14 @@
      * @param sortFinished sort finish time
      * @param finishTime finish time of task
      * @param hostName host name where task attempt executed
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or reduce 
      * @param stateString the state string of the attempt
      * @param counter counters of the attempt
      */
     public static void logFinished(TaskAttemptID taskAttemptId, 
                                    long shuffleFinished, 
                                    long sortFinished, long finishTime, 
-                                   String hostName, boolean isCleanup,
+                                   String hostName, String taskType,
                                    String stateString, Counters counter) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
@@ -1482,8 +1514,7 @@
                                      Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED,
                                      Keys.FINISH_TIME, Keys.HOSTNAME, 
                                      Keys.STATE_STRING, Keys.COUNTERS},
-                         new String[]{isCleanup ? Values.CLEANUP.name() : 
-                                                  Values.REDUCE.name(),
+                         new String[]{taskType,
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.toString(), 
                                       Values.SUCCESS.name(), 
@@ -1503,12 +1534,12 @@
      * @param hostName host name of the task attempt.  
      * @param error error message of the task.
      * @deprecated Use 
-     * {@link #logFailed(TaskAttemptID, long, String, String, boolean)} 
+     * {@link #logFailed(TaskAttemptID, long, String, String, String)} 
      */
     @Deprecated
     public static void logFailed(TaskAttemptID taskAttemptId, long timestamp, 
                                  String hostName, String error){
-      logFailed(taskAttemptId, timestamp, hostName, error, false);
+      logFailed(taskAttemptId, timestamp, hostName, error, Values.REDUCE.name());
     }
     
     /**
@@ -1518,11 +1549,11 @@
      * @param timestamp time stamp when task failed
      * @param hostName host name of the task attempt.  
      * @param error error message of the task. 
-     * @param isCleanup Whether the attempt is cleanup or not 
+     * @param taskType Whether the attempt is cleanup or setup or reduce 
      */
     public static void logFailed(TaskAttemptID taskAttemptId, long timestamp, 
                                  String hostName, String error, 
-                                 boolean isCleanup) {
+                                 String taskType) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
@@ -1533,8 +1564,7 @@
                                       Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                       Keys.FINISH_TIME, Keys.HOSTNAME,
                                       Keys.ERROR },
-                         new String[]{ isCleanup ? Values.CLEANUP.name() : 
-                                                   Values.REDUCE.name(), 
+                         new String[]{ taskType, 
                                        taskAttemptId.getTaskID().toString(), 
                                        taskAttemptId.toString(), 
                                        Values.FAILED.name(), 
@@ -1550,12 +1580,12 @@
      * @param hostName host name of the task attempt.  
      * @param error error message of the task.
      * @deprecated Use 
-     * {@link #logKilled(TaskAttemptID, long, String, String, boolean)} 
+     * {@link #logKilled(TaskAttemptID, long, String, String, String)} 
      */
     @Deprecated
     public static void logKilled(TaskAttemptID taskAttemptId, long timestamp, 
                                  String hostName, String error) {
-      logKilled(taskAttemptId, timestamp, hostName, error, false);
+      logKilled(taskAttemptId, timestamp, hostName, error, Values.REDUCE.name());
     }
     
     /**
@@ -1565,11 +1595,11 @@
      * @param timestamp time stamp when task failed
      * @param hostName host name of the task attempt.  
      * @param error error message of the task. 
-     * @param isCleanup Whether the attempt is cleanup or not 
-     */
+     * @param taskType Whether the attempt is cleanup or setup or reduce 
+    */
     public static void logKilled(TaskAttemptID taskAttemptId, long timestamp, 
                                  String hostName, String error, 
-                                 boolean isCleanup) {
+                                 String taskType) {
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
                                                    + taskAttemptId.getJobID()); 
@@ -1580,8 +1610,7 @@
                                       Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                       Keys.FINISH_TIME, Keys.HOSTNAME, 
                                       Keys.ERROR },
-                         new String[]{ isCleanup ? Values.CLEANUP.name() : 
-                                                   Values.REDUCE.name(),
+                         new String[]{ taskType,
                                        taskAttemptId.getTaskID().toString(), 
                                        taskAttemptId.toString(), 
                                        Values.KILLED.name(), 

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Mon Oct  6 23:59:44 2008
@@ -63,6 +63,7 @@
   TaskInProgress maps[] = new TaskInProgress[0];
   TaskInProgress reduces[] = new TaskInProgress[0];
   TaskInProgress cleanup[] = new TaskInProgress[0];
+  TaskInProgress setup[] = new TaskInProgress[0];
   int numMapTasks = 0;
   int numReduceTasks = 0;
   
@@ -83,6 +84,7 @@
   int failedMapTIPs = 0;
   int failedReduceTIPs = 0;
   private volatile boolean launchedCleanup = false;
+  private volatile boolean launchedSetup = false;
   private volatile boolean jobKilled = false;
   private volatile boolean jobFailed = false;
 
@@ -382,12 +384,13 @@
       // Finished time need to be setted here to prevent this job to be retired
       // from the job tracker jobs at the next retire iteration.
       this.finishTime = this.launchTime;
+      status.setSetupProgress(1.0f);
       status.setMapProgress(1.0f);
       status.setReduceProgress(1.0f);
       status.setCleanupProgress(1.0f);
       status.setRunState(JobStatus.SUCCEEDED);
       tasksInited.set(true);
-      JobHistory.JobInfo.logStarted(profile.getJobID(), 
+      JobHistory.JobInfo.logInited(profile.getJobID(), 
                                     this.launchTime, 0, 0);
       JobHistory.JobInfo.logFinished(profile.getJobID(), 
                                      this.finishTime, 0, 0, 0, 0,
@@ -422,12 +425,22 @@
                        numReduceTasks, jobtracker, conf, this);
     cleanup[1].setCleanupTask();
 
-    this.status = new JobStatus(status.getJobID(), 0.0f, 0.0f, 0.0f, 
-                                          JobStatus.RUNNING, status.getJobPriority());
+    // create two setup tips, one map and one reduce.
+    setup = new TaskInProgress[2];
+    // setup map tip. This map is doesn't use split. 
+    // Just assign splits[0]
+    setup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
+            jobtracker, conf, this, numMapTasks + 1 );
+    setup[0].setSetupTask();
+
+    // setup reduce tip.
+    setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
+                       numReduceTasks + 1, jobtracker, conf, this);
+    setup[1].setSetupTask();
+    
     tasksInited.set(true);
-        
-    JobHistory.JobInfo.logStarted(profile.getJobID(), this.launchTime, 
-                                  numMapTasks, numReduceTasks);
+    JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 
+                                 numMapTasks, numReduceTasks);
   }
 
   /////////////////////////////////////////////////////
@@ -533,6 +546,14 @@
   }
   
   /**
+   * Get the list of setup tasks
+   * @return the array of setup tasks for the job
+   */
+  TaskInProgress[] getSetupTasks() {
+    return setup;
+  }
+  
+  /**
    * Get the list of reduce tasks
    * @return the raw array of reduce tasks for this job
    */
@@ -611,6 +632,21 @@
     return results;
   }
 
+  /**
+   * Return a vector of setup TaskInProgress objects
+   */
+  public synchronized Vector<TaskInProgress> reportSetupTIPs(
+                                               boolean shouldBeComplete) {
+    
+    Vector<TaskInProgress> results = new Vector<TaskInProgress>();
+    for (int i = 0; i < setup.length; i++) {
+      if (setup[i].isComplete() == shouldBeComplete) {
+        results.add(setup[i]);
+      }
+    }
+    return results;
+  }
+
   ////////////////////////////////////////////////////
   // Status update methods
   ////////////////////////////////////////////////////
@@ -655,7 +691,9 @@
                                             taskCompletionEventTracker, 
                                             taskid,
                                             tip.idWithinJob(),
-                                            status.getIsMap(),
+                                            status.getIsMap() &&
+                                            !tip.isCleanupTask() &&
+                                            !tip.isSetupTask(),
                                             TaskCompletionEvent.Status.SUCCEEDED,
                                             httpTaskLogLocation 
                                            );
@@ -698,7 +736,9 @@
         taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, 
                                             taskid,
                                             tip.idWithinJob(),
-                                            status.getIsMap(),
+                                            status.getIsMap() &&
+                                            !tip.isCleanupTask() &&
+                                            !tip.isSetupTask(),
                                             taskCompletionStatus, 
                                             httpTaskLogLocation
                                            );
@@ -727,7 +767,7 @@
                  oldProgress + " to " + tip.getProgress());
     }
     
-    if (!tip.isCleanupTask()) {
+    if (!tip.isCleanupTask() && !tip.isSetupTask()) {
       double progressDelta = tip.getProgress() - oldProgress;
       if (tip.isMapTask()) {
         if (maps.length == 0) {
@@ -860,11 +900,7 @@
     // Now launch the cleanupTask
     Task result = tip.getTaskToRun(tts.getTrackerName());
     if (result != null) {
-      launchedCleanup = true;
-      if (tip.isFirstAttempt(result.getTaskID())) {
-        JobHistory.Task.logStarted(tip.getTIPId(), 
-          Values.CLEANUP.name(), System.currentTimeMillis(), "");
-      }
+      addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
     }
     return result;
   }
@@ -878,8 +914,12 @@
    * @return true/false
    */
   private synchronized boolean canLaunchCleanupTask() {
+    if (!tasksInited.get()) {
+      return false;
+    }
     // check if the job is running
-    if (status.getRunState() != JobStatus.RUNNING) {
+    if (status.getRunState() != JobStatus.RUNNING &&
+        status.getRunState() != JobStatus.PREP) {
       return false;
     }
     // check if cleanup task has been launched already. 
@@ -899,8 +939,63 @@
     }
     return launchCleanupTask;
   }
+
+  /**
+   * Return a SetupTask, if appropriate, to run on the given tasktracker
+   * 
+   */
+  public synchronized Task obtainSetupTask(TaskTrackerStatus tts, 
+                                             int clusterSize, 
+                                             int numUniqueHosts,
+                                             boolean isMapSlot
+                                            ) throws IOException {
+    if (!canLaunchSetupTask()) {
+      return null;
+    }
+    
+    String taskTracker = tts.getTrackerName();
+    // Update the last-known clusterSize
+    this.clusterSize = clusterSize;
+    if (!shouldRunOnTaskTracker(taskTracker)) {
+      return null;
+    }
+    
+    List<TaskInProgress> setupTaskList = new ArrayList<TaskInProgress>();
+    if (isMapSlot) {
+      setupTaskList.add(setup[0]);
+    } else {
+      setupTaskList.add(setup[1]);
+    }
+    TaskInProgress tip = findTaskFromList(setupTaskList,
+                           tts, numUniqueHosts, false);
+    if (tip == null) {
+      return null;
+    }
+    
+    // Now launch the setupTask
+    Task result = tip.getTaskToRun(tts.getTrackerName());
+    if (result != null) {
+      addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+    }
+    return result;
+  }
   
   /**
+   * Check whether setup task can be launched for the job.
+   * 
+   * Setup task can be launched after the tasks are inited
+   * and Job is in PREP state
+   * and if it is not already launched
+   * or job is not Killed/Failed
+   * @return true/false
+   */
+  private synchronized boolean canLaunchSetupTask() {
+    return (tasksInited.get() && status.getRunState() == JobStatus.PREP && 
+           !launchedSetup && !jobKilled && !jobFailed);
+  }
+  
+
+  /**
    * Return a ReduceTask, if appropriate, to run on the given tasktracker.
    * We don't have cache-sensitivity for reduce tasks, as they
    *  work on temporary MapRed files.  
@@ -955,8 +1050,14 @@
                                         boolean isScheduled) {
     // keeping the earlier ordering intact
     String name;
-    Enum counter;
-    if (tip.isMapTask()) {
+    Enum counter = null;
+    if (tip.isSetupTask()) {
+      launchedSetup = true;
+      name = Values.SETUP.name();
+    } else if (tip.isCleanupTask()) {
+      launchedCleanup = true;
+      name = Values.CLEANUP.name();
+    } else if (tip.isMapTask()) {
       ++runningMapTasks;
       name = Values.MAP.name();
       counter = Counter.TOTAL_LAUNCHED_MAPS;
@@ -975,7 +1076,9 @@
       JobHistory.Task.logStarted(tip.getTIPId(), name,
                                  tip.getExecStartTime(), "");
     }
-    jobCounters.incrCounter(counter, 1);
+    if (!tip.isSetupTask() && !tip.isCleanupTask()) {
+      jobCounters.incrCounter(counter, 1);
+    }
     
     // Make an entry in the tip if the attempt is not scheduled i.e externally
     // added
@@ -994,7 +1097,7 @@
     //
     // So to simplify, increment the data locality counter whenever there is 
     // data locality.
-    if (tip.isMapTask()) {
+    if (tip.isMapTask() && !tip.isSetupTask() && !tip.isCleanupTask()) {
       // increment the data locality counter for maps
       Node tracker = jobtracker.getNode(tts.getHost());
       int level = this.maxLevel;
@@ -1648,65 +1751,45 @@
     TaskTrackerStatus ttStatus = 
       this.jobtracker.getTaskTracker(status.getTaskTracker());
     String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
+    String taskType = tip.isCleanupTask() ? Values.CLEANUP.name() :
+                      tip.isSetupTask() ? Values.SETUP.name() :
+                      tip.isMapTask() ? Values.MAP.name() : 
+                      Values.REDUCE.name();
     if (status.getIsMap()){
       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
                                        status.getTaskTracker(), 
                                        ttStatus.getHttpPort(), 
-                                       tip.isCleanupTask()); 
+                                       taskType); 
       JobHistory.MapAttempt.logFinished(status.getTaskID(), status.getFinishTime(), 
-                                        trackerHostname, tip.isCleanupTask(),
+                                        trackerHostname, taskType,
                                         status.getStateString(), 
                                         status.getCounters()); 
-      JobHistory.Task.logFinished(tip.getTIPId(), 
-                                  tip.isCleanupTask() ? Values.CLEANUP.name() :
-                                  Values.MAP.name(), 
-                                  tip.getExecFinishTime(),
-                                  status.getCounters()); 
     }else{
       JobHistory.ReduceAttempt.logStarted( status.getTaskID(), status.getStartTime(), 
                                           status.getTaskTracker(),
                                           ttStatus.getHttpPort(), 
-                                          tip.isCleanupTask()); 
+                                          taskType); 
       JobHistory.ReduceAttempt.logFinished(status.getTaskID(), status.getShuffleFinishTime(),
                                            status.getSortFinishTime(), status.getFinishTime(), 
-                                           trackerHostname, tip.isCleanupTask(), 
+                                           trackerHostname, 
+                                           taskType,
                                            status.getStateString(), 
                                            status.getCounters()); 
-      JobHistory.Task.logFinished(tip.getTIPId(), 
-                                  tip.isCleanupTask() ? Values.CLEANUP.name() :
-                                  Values.REDUCE.name(), tip.getExecFinishTime(),
-                                  status.getCounters()); 
     }
+    JobHistory.Task.logFinished(tip.getTIPId(), 
+                                taskType,
+                                tip.getExecFinishTime(),
+                                status.getCounters()); 
         
     int newNumAttempts = tip.getActiveTasks().size();
-    if (!tip.isCleanupTask()) {
-      if (tip.isMapTask()) {
-        runningMapTasks -= 1;
-        // check if this was a sepculative task
-        if (oldNumAttempts > 1) {
-          speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
-        }
-        finishedMapTasks += 1;
-        metrics.completeMap(taskid);
-        // remove the completed map from the resp running caches
-        retireMap(tip);
-        if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
-          this.status.setMapProgress(1.0f);
-        }
-      } else {
-        runningReduceTasks -= 1;
-        if (oldNumAttempts > 1) {
-          speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
-        }
-        finishedReduceTasks += 1;
-        metrics.completeReduce(taskid);
-        // remove the completed reduces from the running reducers set
-        retireReduce(tip);
-        if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
-          this.status.setReduceProgress(1.0f);
-        }
-      }
-    } else {
+    if (tip.isSetupTask()) {
+      // setup task has finished. kill the extra setup tip
+      killSetupTip(!tip.isMapTask());
+      // Job can start running now.
+      this.status.setSetupProgress(1.0f);
+      this.status.setRunState(JobStatus.RUNNING);
+      JobHistory.JobInfo.logStarted(profile.getJobID());
+    } else if (tip.isCleanupTask()) {
       // cleanup task has finished. Kill the extra cleanup tip
       if (tip.isMapTask()) {
         // kill the reduce tip
@@ -1730,7 +1813,31 @@
       // The job has been killed/failed/successful
       // JobTracker should cleanup this task
       jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
-      return false;
+    } else if (tip.isMapTask()) {
+      runningMapTasks -= 1;
+      // check if this was a sepculative task
+      if (oldNumAttempts > 1) {
+        speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
+      }
+      finishedMapTasks += 1;
+      metrics.completeMap(taskid);
+      // remove the completed map from the resp running caches
+      retireMap(tip);
+      if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
+        this.status.setMapProgress(1.0f);
+      }
+    } else {
+      runningReduceTasks -= 1;
+      if (oldNumAttempts > 1) {
+        speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
+      }
+      finishedReduceTasks += 1;
+      metrics.completeReduce(taskid);
+      // remove the completed reduces from the running reducers set
+      retireReduce(tip);
+      if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
+        this.status.setReduceProgress(1.0f);
+      }
     }
     
     return true;
@@ -1764,7 +1871,7 @@
     }
   }
   
-  private synchronized void terminateJob(int jobTerminationState) {
+  synchronized void terminateJob(int jobTerminationState) {
     if ((status.getRunState() == JobStatus.RUNNING) ||
         (status.getRunState() == JobStatus.PREP)) {
       if (jobTerminationState == JobStatus.FAILED) {
@@ -1859,6 +1966,8 @@
     if (wasRunning && !isRunning) {
       if (tip.isCleanupTask()) {
         launchedCleanup = false;
+      } else if (tip.isSetupTask()) {
+        launchedSetup = false;
       } else if (tip.isMapTask()) {
         runningMapTasks -= 1;
         // remove from the running queue and put it in the non-running cache
@@ -1896,36 +2005,41 @@
     // update job history
     String taskTrackerName = taskTrackerStatus.getHost();
     long finishTime = status.getFinishTime();
+    String taskType = tip.isCleanupTask() ? Values.CLEANUP.name() :
+                      tip.isSetupTask() ? Values.SETUP.name() :
+                      tip.isMapTask() ? Values.MAP.name() : 
+                      Values.REDUCE.name();
     if (status.getIsMap()) {
       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
           status.getTaskTracker(), taskTrackerStatus.getHttpPort(), 
-          tip.isCleanupTask());
+          taskType);
       if (status.getRunState() == TaskStatus.State.FAILED) {
         JobHistory.MapAttempt.logFailed(status.getTaskID(), finishTime,
-                taskTrackerName, status.getDiagnosticInfo(), tip.isCleanupTask());
+                taskTrackerName, status.getDiagnosticInfo(), 
+                taskType);
       } else {
         JobHistory.MapAttempt.logKilled(status.getTaskID(), finishTime,
                 taskTrackerName, status.getDiagnosticInfo(),
-                tip.isCleanupTask());
+                taskType);
       }
     } else {
       JobHistory.ReduceAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
           status.getTaskTracker(), taskTrackerStatus.getHttpPort(), 
-          tip.isCleanupTask());
+          taskType);
       if (status.getRunState() == TaskStatus.State.FAILED) {
         JobHistory.ReduceAttempt.logFailed(status.getTaskID(), finishTime,
                 taskTrackerName, status.getDiagnosticInfo(), 
-                tip.isCleanupTask());
+                taskType);
       } else {
         JobHistory.ReduceAttempt.logKilled(status.getTaskID(), finishTime,
                 taskTrackerName, status.getDiagnosticInfo(), 
-                tip.isCleanupTask());
+                taskType);
       }
     }
         
     // After this, try to assign tasks with the one after this, so that
     // the failed task goes to the end of the list.
-    if (!tip.isCleanupTask()) {
+    if (!tip.isCleanupTask() && !tip.isSetupTask()) {
       if (tip.isMapTask()) {
         failedMapTasks++;
       } else {
@@ -1955,7 +2069,7 @@
       // Allow upto 'mapFailuresPercent' of map tasks to fail or
       // 'reduceFailuresPercent' of reduce tasks to fail
       //
-      boolean killJob = tip.isCleanupTask() ? true :
+      boolean killJob = tip.isCleanupTask() || tip.isSetupTask() ? true :
                         tip.isMapTask() ? 
             ((++failedMapTIPs*100) > (mapFailuresPercent*numMapTasks)) :
             ((++failedReduceTIPs*100) > (reduceFailuresPercent*numReduceTasks));
@@ -1963,12 +2077,8 @@
       if (killJob) {
         LOG.info("Aborting job " + profile.getJobID());
         JobHistory.Task.logFailed(tip.getTIPId(), 
-                                  tip.isCleanupTask() ?
-                                    Values.CLEANUP.name() :
-                                  tip.isMapTask() ? 
-                                          Values.MAP.name() : 
-                                          Values.REDUCE.name(),  
-                                          status.getFinishTime(), 
+                                  taskType,  
+                                  status.getFinishTime(), 
                                   status.getDiagnosticInfo());
         if (tip.isCleanupTask()) {
           // kill the other tip
@@ -1979,6 +2089,10 @@
           }
           terminateJob(JobStatus.FAILED);
         } else {
+          if (tip.isSetupTask()) {
+            // kill the other tip
+            killSetupTip(!tip.isMapTask());
+          }
           fail();
         }
       }
@@ -1986,7 +2100,7 @@
       //
       // Update the counters
       //
-      if (!tip.isCleanupTask()) {
+      if (!tip.isCleanupTask() && !tip.isSetupTask()) {
         if (tip.isMapTask()) {
           jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
         } else {
@@ -1996,6 +2110,14 @@
     }
   }
 
+  void killSetupTip(boolean isMap) {
+    if (isMap) {
+      setup[0].kill();
+    } else {
+      setup[1].kill();
+    }
+  }
+
   /**
    * Fail a task with a given reason, but without a status object.
    * @param tip The task's tip
@@ -2018,6 +2140,7 @@
     updateTaskStatus(tip, status, metrics);
     JobHistory.Task.logFailed(tip.getTIPId(), 
                               tip.isCleanupTask() ? Values.CLEANUP.name() : 
+                              tip.isSetupTask() ? Values.SETUP.name() : 
                               tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(), 
                               tip.getExecFinishTime(), reason, taskid); 
   }
@@ -2072,18 +2195,24 @@
    */
   public synchronized TaskInProgress getTaskInProgress(TaskID tipid) {
     if (tipid.isMap()) {
-      if (tipid.getId() == maps.length) { // cleanup map tip
+      if (tipid.equals(cleanup[0].getTIPId())) { // cleanup map tip
         return cleanup[0]; 
       }
+      if (tipid.equals(setup[0].getTIPId())) { //setup map tip
+        return setup[0];
+      }
       for (int i = 0; i < maps.length; i++) {
         if (tipid.equals(maps[i].getTIPId())){
           return maps[i];
         }
       }
     } else {
-      if (tipid.getId() == reduces.length) { // cleanup reduce tip
+      if (tipid.equals(cleanup[1].getTIPId())) { // cleanup reduce tip
         return cleanup[1]; 
       }
+      if (tipid.equals(setup[1].getTIPId())) { //setup reduce tip
+        return setup[1];
+      }
       for (int i = 0; i < reduces.length; i++) {
         if (tipid.equals(reduces[i].getTIPId())){
           return reduces[i];

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobStatus.java Mon Oct  6 23:59:44 2008
@@ -52,6 +52,7 @@
   private float mapProgress;
   private float reduceProgress;
   private float cleanupProgress;
+  private float setupProgress;
   private int runState;
   private long startTime;
   private String user;
@@ -99,7 +100,25 @@
    */
    public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
                       float cleanupProgress, int runState, JobPriority jp) {
+     this(jobid, 0.0f, mapProgress, reduceProgress, 
+          cleanupProgress, runState, jp);
+   }
+   
+  /**
+   * Create a job status object for a given jobid.
+   * @param jobid The jobid of the job
+   * @param setupProgress The progress made on the setup
+   * @param mapProgress The progress made on the maps
+   * @param reduceProgress The progress made on the reduces
+   * @param cleanupProgress The progress made on the cleanup
+   * @param runState The current state of the job
+   * @param jp Priority of the job.
+   */
+   public JobStatus(JobID jobid, float setupProgress, float mapProgress,
+                    float reduceProgress, float cleanupProgress, 
+                    int runState, JobPriority jp) {
      this.jobid = jobid;
+     this.setupProgress = setupProgress;
      this.mapProgress = mapProgress;
      this.reduceProgress = reduceProgress;
      this.cleanupProgress = cleanupProgress;
@@ -110,6 +129,7 @@
      }
      priority = jp;
    }
+   
   /**
    * @deprecated use getJobID instead
    */
@@ -148,6 +168,19 @@
   }
 
   /**
+   * @return Percentage of progress in setup 
+   */
+  public synchronized float setupProgress() { return setupProgress; }
+    
+  /**
+   * Sets the setup progress of this job
+   * @param p The value of setup progress to set to
+   */
+  synchronized void setSetupProgress(float p) { 
+    this.setupProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
+  }
+
+  /**
    * @return Percentage of progress in reduce 
    */
   public synchronized float reduceProgress() { return reduceProgress; }
@@ -232,6 +265,7 @@
   ///////////////////////////////////////
   public synchronized void write(DataOutput out) throws IOException {
     jobid.write(out);
+    out.writeFloat(setupProgress);
     out.writeFloat(mapProgress);
     out.writeFloat(reduceProgress);
     out.writeFloat(cleanupProgress);
@@ -244,6 +278,7 @@
 
   public synchronized void readFields(DataInput in) throws IOException {
     this.jobid = JobID.read(in);
+    this.setupProgress = in.readFloat();
     this.mapProgress = in.readFloat();
     this.reduceProgress = in.readFloat();
     this.cleanupProgress = in.readFloat();

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java Mon Oct  6 23:59:44 2008
@@ -47,8 +47,10 @@
    *             and getAllJobs(queue) as a part of HADOOP-3930
    * Version 14: Added setPriority for HADOOP-4124
    * Version 15: Added KILLED status to JobStatus as part of HADOOP-3924            
+   * Version 16: Added getSetupTaskReports and 
+   *             setupProgress to JobStatus as part of HADOOP-4261           
    */
-  public static final long versionID = 15L;
+  public static final long versionID = 16L;
 
   /**
    * Allocate a name for the job.
@@ -123,6 +125,11 @@
   public TaskReport[] getCleanupTaskReports(JobID jobid) throws IOException;
 
   /**
+   * Grab a bunch of info on the setup tasks that make up the job
+   */
+  public TaskReport[] getSetupTaskReports(JobID jobid) throws IOException;
+
+  /**
    * A MapReduce system always operates on a single filesystem.  This 
    * function returns the fs name.  ('local' if the localfs; 'addr:port' 
    * if dfs).  The client can then copy files into the right locations 

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java Mon Oct  6 23:59:44 2008
@@ -531,7 +531,7 @@
       // is updated
       private void checkAndInit() throws IOException {
         String jobStatus = this.job.get(Keys.JOB_STATUS);
-        if (Values.RUNNING.name().equals(jobStatus)) {
+        if (Values.PREP.name().equals(jobStatus)) {
           hasUpdates = true;
           LOG.info("Calling init from RM for job " + jip.getJobID().toString());
           jip.initTasks();
@@ -1860,7 +1860,7 @@
       if (taskTrackerStatus == null) {
         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
       } else {
-        List<Task> tasks = getCleanupTask(taskTrackerStatus);
+        List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
         if (tasks == null ) {
           tasks = taskScheduler.assignTasks(taskTrackerStatus);
         }
@@ -2099,8 +2099,9 @@
     return null;
   }
   
-  private synchronized List<Task> getCleanupTask(TaskTrackerStatus taskTracker)
-  throws IOException {
+  // returns cleanup tasks first, then setup tasks.
+  private synchronized List<Task> getSetupAndCleanupTasks(
+    TaskTrackerStatus taskTracker) throws IOException {
     int maxMapTasks = taskTracker.getMaxMapTasks();
     int maxReduceTasks = taskTracker.getMaxReduceTasks();
     int numMaps = taskTracker.countMapTasks();
@@ -2120,6 +2121,15 @@
             return Collections.singletonList(t);
           }
         }
+        for (Iterator<JobInProgress> it = jobs.values().iterator();
+             it.hasNext();) {
+          JobInProgress job = it.next();
+          t = job.obtainSetupTask(taskTracker, numTaskTrackers,
+                                  numUniqueHosts, true);
+          if (t != null) {
+            return Collections.singletonList(t);
+          }
+        }
       }
       if (numReduces < maxReduceTasks) {
         for (Iterator<JobInProgress> it = jobs.values().iterator();
@@ -2131,6 +2141,15 @@
             return Collections.singletonList(t);
           }
         }
+        for (Iterator<JobInProgress> it = jobs.values().iterator();
+             it.hasNext();) {
+          JobInProgress job = it.next();
+          t = job.obtainSetupTask(taskTracker, numTaskTrackers,
+                                    numUniqueHosts, false);
+          if (t != null) {
+            return Collections.singletonList(t);
+          }
+        }
       }
     }
     return null;
@@ -2353,6 +2372,28 @@
   
   }
   
+  public synchronized TaskReport[] getSetupTaskReports(JobID jobid) {
+    JobInProgress job = jobs.get(jobid);
+    if (job == null) {
+      return new TaskReport[0];
+    } else {
+      Vector<TaskReport> reports = new Vector<TaskReport>();
+      Vector<TaskInProgress> completeTasks = job.reportSetupTIPs(true);
+      for (Iterator<TaskInProgress> it = completeTasks.iterator();
+           it.hasNext();) {
+        TaskInProgress tip = (TaskInProgress) it.next();
+        reports.add(tip.generateSingleReport());
+      }
+      Vector<TaskInProgress> incompleteTasks = job.reportSetupTIPs(false);
+      for (Iterator<TaskInProgress> it = incompleteTasks.iterator(); 
+           it.hasNext();) {
+        TaskInProgress tip = (TaskInProgress) it.next();
+        reports.add(tip.generateSingleReport());
+      }
+      return reports.toArray(new TaskReport[reports.size()]);
+    }
+  }
+  
   TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
   
   /* 
@@ -2576,7 +2617,8 @@
         // And completed maps with zero reducers of the job 
         // never need to be failed. 
         if (!tip.isComplete() || 
-            (tip.isMapTask() && job.desiredReduces() != 0)) {
+            (tip.isMapTask() && !tip.isSetupTask() && 
+             job.desiredReduces() != 0)) {
           // if the job is done, we don't want to change anything
           if (job.getStatus().getRunState() == JobStatus.RUNNING) {
             job.failedTask(tip, taskId, ("Lost task tracker: " + trackerName), 

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Mon Oct  6 23:59:44 2008
@@ -114,6 +114,8 @@
         }
         JobContext jContext = new JobContext(conf);
         OutputCommitter outputCommitter = job.getOutputCommitter();
+        outputCommitter.setupJob(jContext);
+        status.setSetupProgress(1.0f);
         
         DataOutputBuffer buffer = new DataOutputBuffer();
         for (int i = 0; i < splits.length; i++) {
@@ -336,6 +338,9 @@
   public TaskReport[] getCleanupTaskReports(JobID id) {
     return new TaskReport[0];
   }
+  public TaskReport[] getSetupTaskReports(JobID id) {
+    return new TaskReport[0];
+  }
 
   public JobStatus getJobStatus(JobID id) {
     Job job = jobs.get(id);

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/MapTask.java Mon Oct  6 23:59:44 2008
@@ -283,6 +283,10 @@
       runCleanup(umbilical);
       return;
     }
+    if (setupJob) {
+      runSetupJob(umbilical);
+      return;
+    }
 
     int numReduceTasks = conf.getNumReduceTasks();
     LOG.info("numReduceTasks: " + numReduceTasks);

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/OutputCommitter.java Mon Oct  6 23:59:44 2008
@@ -30,12 +30,10 @@
  *   <li>
  *   Setup the job during initialization. For example, create the temporary 
  *   output directory for the job during the initialization of the job.
- *   The job client does the setup for the job.
  *   </li>
  *   <li>
  *   Cleanup the job after the job completion. For example, remove the
- *   temporary output directory after the job completion. CleanupJob is done
- *   by a separate task at the end of the job.
+ *   temporary output directory after the job completion. 
  *   </li>
  *   <li>
  *   Setup the task temporary output.
@@ -61,8 +59,6 @@
   /**
    * For the framework to setup the job output during initialization
    * 
-   * The job client does the setup for the job.
-   *   
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException if temporary output could not be created
    */

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Mon Oct  6 23:59:44 2008
@@ -336,7 +336,7 @@
     job.setBoolean("mapred.skip.on", isSkipping());
     Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
 
-    if (!cleanupJob) {
+    if (!cleanupJob && !setupJob) {
       copyPhase = getProgress().addPhase("copy");
       sortPhase  = getProgress().addPhase("sort");
       reducePhase = getProgress().addPhase("reduce");
@@ -351,6 +351,10 @@
       runCleanup(umbilical);
       return;
     }
+    if (setupJob) {
+      runSetupJob(umbilical);
+      return;
+    }
     
     // Initialize the codec
     codec = initCodec();

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/RunningJob.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/RunningJob.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/RunningJob.java Mon Oct  6 23:59:44 2008
@@ -94,6 +94,15 @@
   public float cleanupProgress() throws IOException;
 
   /**
+   * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 
+   * and 1.0.  When all setup tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's setup-tasks.
+   * @throws IOException
+   */
+  public float setupProgress() throws IOException;
+
+  /**
    * Check if the job is finished or not. 
    * This is a non-blocking call.
    * 

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Task.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/Task.java Mon Oct  6 23:59:44 2008
@@ -108,6 +108,7 @@
   private int partition;                          // id within job
   TaskStatus taskStatus;                          // current status of the task
   protected boolean cleanupJob = false;
+  protected boolean setupJob = false;
   private Thread pingProgressThread;
   
   //skip ranges based on failed ranges from previous attempts
@@ -241,6 +242,9 @@
     cleanupJob = true;
   }
 
+  public void setSetupTask() {
+    setupJob = true; 
+  }
   ////////////////////////////////////////////
   // Writable methods
   ////////////////////////////////////////////
@@ -253,6 +257,7 @@
     skipRanges.write(out);
     out.writeBoolean(skipping);
     out.writeBoolean(cleanupJob);
+    out.writeBoolean(setupJob);
     out.writeBoolean(writeSkipRecs);
   }
   public void readFields(DataInput in) throws IOException {
@@ -266,6 +271,7 @@
     currentRecStartIndex = currentRecIndexIterator.next();
     skipping = in.readBoolean();
     cleanupJob = in.readBoolean();
+    setupJob = in.readBoolean();
     writeSkipRecs = in.readBoolean();
   }
 
@@ -718,6 +724,14 @@
     conf.getOutputCommitter().cleanupJob(jobContext);
     done(umbilical);
   }
+
+  protected void runSetupJob(TaskUmbilicalProtocol umbilical) 
+  throws IOException {
+    // do the setup
+    getProgress().setStatus("setup");
+    conf.getOutputCommitter().setupJob(jobContext);
+    done(umbilical);
+  }
   
   public void setConf(Configuration conf) {
     if (conf instanceof JobConf) {

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Mon Oct  6 23:59:44 2008
@@ -83,6 +83,7 @@
   private FailedRanges failedRanges = new FailedRanges();
   private volatile boolean skipping = false;
   private boolean cleanup = false; 
+  private boolean setup = false;
    
   // The 'next' usable taskid of this tip
   int nextTaskId = 0;
@@ -180,7 +181,15 @@
   public void setCleanupTask() {
     cleanup = true;
   }
-  
+
+  public boolean isSetupTask() {
+    return setup;
+  }
+	  
+  public void setSetupTask() {
+    setup = true;
+  }
+
   public boolean isOnlyCommitPending() {
     for (TaskStatus t : taskStatuses.values()) {
       if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) {
@@ -380,7 +389,8 @@
         (job.getStatus().getRunState() != JobStatus.RUNNING)) {
       tasksReportedClosed.add(taskid);
       close = true;
-    } else if (isComplete() && !(isMapTask() && isComplete(taskid)) &&
+    } else if (isComplete() && 
+               !(isMapTask() && !setup && !cleanup && isComplete(taskid)) &&
                !tasksReportedClosed.contains(taskid)) {
       tasksReportedClosed.add(taskid);
       close = true; 
@@ -565,7 +575,7 @@
     // should note this failure only for completed maps, only if this taskid;
     // completed this map. however if the job is done, there is no need to 
     // manipulate completed maps
-    if (this.isMapTask() && isComplete(taskid) && 
+    if (this.isMapTask() && !setup && !cleanup && isComplete(taskid) && 
         jobStatus.getRunState() != JobStatus.SUCCEEDED) {
       this.completes--;
       
@@ -850,6 +860,9 @@
     if (cleanup) {
       t.setCleanupTask();
     }
+    if (setup) {
+      t.setSetupTask();
+    }
     t.setConf(conf);
     LOG.debug("Launching task with skipRanges:"+failedRanges.getSkipRanges());
     t.setSkipRanges(failedRanges.getSkipRanges());
@@ -951,7 +964,7 @@
   }
 
   public long getMapInputSize() {
-    if(isMapTask()) {
+    if(isMapTask() && !setup && !cleanup) {
       return rawSplit.getDataLength();
     } else {
       return 0;

Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Mon Oct  6 23:59:44 2008
@@ -464,9 +464,9 @@
   }
     
   /**
-   * Get the map task completion events
+   * Get the task completion events
    */
-  public TaskCompletionEvent[] getMapTaskCompletionEvents(JobID id, int from, 
+  public TaskCompletionEvent[] getTaskCompletionEvents(JobID id, int from, 
                                                           int max) 
   throws IOException {
     return jobTracker.getJobTracker().getTaskCompletionEvents(id, from, max);

Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java (original)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java Mon Oct  6 23:59:44 2008
@@ -471,8 +471,9 @@
     }
     
     TaskCompletionEvent[] prevEvents = 
-      mr.getMapTaskCompletionEvents(id, 0, numMaps);
-    TaskReport[] prevReports = jobClient.getMapTaskReports(id);
+      mr.getTaskCompletionEvents(id, 0, numMaps);
+    TaskReport[] prevSetupReports = jobClient.getSetupTaskReports(id);
+    TaskReport[] prevMapReports = jobClient.getMapTaskReports(id);
     ClusterStatus prevStatus = jobClient.getClusterStatus();
     
     mr.stopJobTracker();
@@ -502,7 +503,7 @@
     
     // Get the new jobtrackers events
     TaskCompletionEvent[] jtEvents =  
-      mr.getMapTaskCompletionEvents(id, 0, 2 * numMaps);
+      mr.getTaskCompletionEvents(id, 0, 2 * numMaps);
     
     // Test if all the events that were recovered match exactly
     testTaskCompletionEvents(prevEvents, jtEvents, false, numToMatch);
@@ -521,8 +522,10 @@
     
     // Check the task reports
     // The reports should match exactly if the attempts are same
-    TaskReport[] afterReports = jobClient.getMapTaskReports(id);
-    testTaskReports(prevReports, afterReports, numToMatch);
+    TaskReport[] afterMapReports = jobClient.getMapTaskReports(id);
+    TaskReport[] afterSetupReports = jobClient.getSetupTaskReports(id);
+    testTaskReports(prevMapReports, afterMapReports, numToMatch - 1);
+    testTaskReports(prevSetupReports, afterSetupReports, 1);
     
     //  Signal the reduce tasks
     signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir), 
@@ -829,4 +832,4 @@
   public static void main(String[] args) throws IOException {
     new TestJobTrackerRestart().testJobTrackerRestart();
   }
-}
\ No newline at end of file
+}

Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Mon Oct  6 23:59:44 2008
@@ -70,7 +70,10 @@
       // test the task report fetchers
       JobClient client = new JobClient(job);
       JobID jobid = ret.job.getID();
-      TaskReport[] reports = client.getMapTaskReports(jobid);
+      TaskReport[] reports;
+      reports = client.getSetupTaskReports(jobid);
+      assertEquals("number of setups", 2, reports.length);
+      reports = client.getMapTaskReports(jobid);
       assertEquals("number of maps", 1, reports.length);
       reports = client.getReduceTaskReports(jobid);
       assertEquals("number of reduces", 1, reports.length);

Modified: hadoop/core/branches/branch-0.19/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/webapps/job/jobdetails.jsp?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/webapps/job/jobdetails.jsp (original)
+++ hadoop/core/branches/branch-0.19/src/webapps/job/jobdetails.jsp Mon Oct  6 23:59:44 2008
@@ -88,15 +88,15 @@
               "</td></tr>\n");
   }
 
-  private void printCleanupTaskSummary(JspWriter out,
+  private void printJobLevelTaskSummary(JspWriter out,
                                 String jobId,
+                                String kind,
                                 TaskInProgress[] tasks
                                ) throws IOException {
     int totalTasks = tasks.length;
     int runningTasks = 0;
     int finishedTasks = 0;
     int killedTasks = 0;
-    String kind = "cleanup";
     for(int i=0; i < totalTasks; ++i) {
       TaskInProgress task = tasks[i];
       if (task.isComplete()) {
@@ -208,6 +208,9 @@
     out.print("<b>Job Name:</b> " + profile.getJobName() + "<br>\n");
     out.print("<b>Job File:</b> <a href=\"jobconf.jsp?jobid=" + jobId + "\">" 
               + profile.getJobFile() + "</a><br>\n");
+    out.print("<b>Job Setup:</b>");
+    printJobLevelTaskSummary(out, jobId, "setup", job.getSetupTasks());
+    out.print("<br>\n");
     if (runState == JobStatus.RUNNING) {
       out.print("<b>Status:</b> Running<br>\n");
       out.print("<b>Started at:</b> " + new Date(job.getStartTime()) + "<br>\n");
@@ -238,7 +241,7 @@
       }
     }
     out.print("<b>Job Cleanup:</b>");
-    printCleanupTaskSummary(out, jobId, job.getCleanupTasks());
+    printJobLevelTaskSummary(out, jobId, "cleanup", job.getCleanupTasks());
     out.print("<br>\n");
     if (flakyTaskTrackers > 0) {
       out.print("<b>Black-listed TaskTrackers:</b> " + 

Modified: hadoop/core/branches/branch-0.19/src/webapps/job/jobdetailshistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/webapps/job/jobdetailshistory.jsp?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/webapps/job/jobdetailshistory.jsp (original)
+++ hadoop/core/branches/branch-0.19/src/webapps/job/jobdetailshistory.jsp Mon Oct  6 23:59:44 2008
@@ -42,6 +42,7 @@
     int totalMaps = 0 ; 
     int totalReduces = 0;
     int totalCleanups = 0; 
+    int totalSetups = 0; 
     int numFailedMaps = 0; 
     int numKilledMaps = 0;
     int numFailedReduces = 0 ; 
@@ -49,6 +50,9 @@
     int numFinishedCleanups = 0;
     int numFailedCleanups = 0;
     int numKilledCleanups = 0;
+    int numFinishedSetups = 0;
+    int numFailedSetups = 0;
+    int numKilledSetups = 0;
 	
     long mapStarted = 0 ; 
     long mapFinished = 0 ; 
@@ -56,6 +60,8 @@
     long reduceFinished = 0;
     long cleanupStarted = 0;
     long cleanupFinished = 0; 
+    long setupStarted = 0;
+    long setupFinished = 0; 
         
     Map <String,String> allHosts = new TreeMap<String,String>();
     for (JobHistory.Task task : tasks.values()) {
@@ -104,6 +110,21 @@
             numFailedCleanups++;
           } else if (Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS))) {
             numKilledCleanups++;
+          } 
+        } else if (Values.SETUP.name().equals(task.get(Keys.TASK_TYPE))) {
+          if (setupStarted==0||setupStarted > startTime) {
+            setupStarted = startTime ; 
+          }
+          if (setupFinished < finishTime) {
+            setupFinished = finishTime; 
+          }
+          totalSetups++; 
+          if (Values.SUCCESS.name().equals(attempt.get(Keys.TASK_STATUS))) {
+            numFinishedSetups++;
+          } else if (Values.FAILED.name().equals(attempt.get(Keys.TASK_STATUS))) {
+            numFailedSetups++;
+          } else if (Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS))) {
+            numKilledSetups++;
           }
         }
       }
@@ -117,6 +138,19 @@
 <td>Kind</td><td>Total Tasks(successful+failed+killed)</td><td>Successful tasks</td><td>Failed tasks</td><td>Killed tasks</td><td>Start Time</td><td>Finish Time</td>
 </tr>
 <tr>
+<td>Setup</td>
+    <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.SETUP.name() %>&status=all">
+        <%=totalSetups%></a></td>
+    <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.SETUP.name() %>&status=<%=Values.SUCCESS %>">
+        <%=numFinishedSetups%></a></td>
+    <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.SETUP.name() %>&status=<%=Values.FAILED %>">
+        <%=numFailedSetups%></a></td>
+    <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.SETUP.name() %>&status=<%=Values.KILLED %>">
+        <%=numKilledSetups%></a></td>  
+    <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, setupStarted, 0) %></td>
+    <td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, setupFinished, setupStarted) %></td>
+</tr>
+<tr>
 <td>Map</td>
     <td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=encodedLogFileName%>&taskType=<%=Values.MAP.name() %>&status=all">
         <%=totalMaps %></a></td>

Modified: hadoop/core/branches/branch-0.19/src/webapps/job/jobtasks.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/webapps/job/jobtasks.jsp?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/webapps/job/jobtasks.jsp (original)
+++ hadoop/core/branches/branch-0.19/src/webapps/job/jobtasks.jsp Mon Oct  6 23:59:44 2008
@@ -21,7 +21,7 @@
   }
   String type = request.getParameter("type");
   String pagenum = request.getParameter("pagenum");
-  TaskInProgress[] tasks;
+  TaskInProgress[] tasks = null;
   String state = request.getParameter("state");
   state = (state!=null) ? state : "all";
   int pnum = Integer.parseInt(pagenum);
@@ -42,9 +42,12 @@
   else if ("reduce".equals(type)) {
     reports = (job != null) ? tracker.getReduceTaskReports(jobidObj) : null;
     tasks = (job != null) ? job.getReduceTasks() : null;
-  } else {
+  } else if ("cleanup".equals(type)) {
     reports = (job != null) ? tracker.getCleanupTaskReports(jobidObj) : null;
     tasks = (job != null) ? job.getCleanupTasks() : null;
+  } else if ("setup".equals(type)) {
+    reports = (job != null) ? tracker.getSetupTaskReports(jobidObj) : null;
+    tasks = (job != null) ? job.getSetupTasks() : null;
   }
 %>
 

Modified: hadoop/core/branches/branch-0.19/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/webapps/job/taskdetails.jsp?rev=702361&r1=702360&r2=702361&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/webapps/job/taskdetails.jsp (original)
+++ hadoop/core/branches/branch-0.19/src/webapps/job/taskdetails.jsp Mon Oct  6 23:59:44 2008
@@ -69,9 +69,12 @@
     }
     TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(tipidObj)
         : null;
-    boolean isCleanup = false;
+    boolean isCleanupOrSetup = false;
     if (tipidObj != null) { 
-      isCleanup = job.getTaskInProgress(tipidObj).isCleanupTask();
+      isCleanupOrSetup = job.getTaskInProgress(tipidObj).isCleanupTask();
+      if (!isCleanupOrSetup) {
+        isCleanupOrSetup = job.getTaskInProgress(tipidObj).isSetupTask();
+      }
     }
 %>
 
@@ -98,7 +101,7 @@
 <table border=2 cellpadding="5" cellspacing="2">
 <tr><td align="center">Task Attempts</td><td>Machine</td><td>Status</td><td>Progress</td><td>Start Time</td> 
   <%
-   if (!ts[0].getIsMap() && !isCleanup) {
+   if (!ts[0].getIsMap() && !isCleanupOrSetup) {
    %>
 <td>Shuffle Finished</td><td>Sort Finished</td>
   <%
@@ -126,7 +129,7 @@
         out.print("<td>"
           + StringUtils.getFormattedTimeWithDiff(dateFormat, status
           .getStartTime(), 0) + "</td>");
-        if (!ts[i].getIsMap() && !isCleanup) {
+        if (!ts[i].getIsMap() && !isCleanupOrSetup) {
           out.print("<td>"
           + StringUtils.getFormattedTimeWithDiff(dateFormat, status
           .getShuffleFinishTime(), status.getStartTime()) + "</td>");