You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/05/20 11:13:56 UTC

svn commit: r776631 - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/util/ src/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/ src/webapps/job/

Author: ddas
Date: Wed May 20 09:13:55 2009
New Revision: 776631

URL: http://svn.apache.org/viewvc?rev=776631&view=rev
Log:
HADOOP-5572. Improves the progress reporting for the sort phase for both maps and reduces. Contributed by Ravi Gummadi.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/util/Progress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java
    hadoop/core/trunk/src/webapps/job/taskdetails.jsp

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed May 20 09:13:55 2009
@@ -376,6 +376,9 @@
     HADOOP-5873. Remove deprecated methods randomDataNode() and
     getDatanodeByIndex(..) in FSNamesystem.  (szetszwo)
 
+    HADOOP-5572. Improves the progress reporting for the sort phase for both
+    maps and reduces. (Ravi Gummadi via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/util/Progress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/Progress.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/Progress.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/Progress.java Wed May 20 09:13:55 2009
@@ -20,19 +20,32 @@
 
 import java.util.ArrayList;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 /** Utility to assist with generation of progress reports.  Applications build
  * a hierarchy of {@link Progress} instances, each modelling a phase of
  * execution.  The root is constructed with {@link #Progress()}.  Nodes for
  * sub-phases are created by calling {@link #addPhase()}.
  */
 public class Progress {
+  private static final Log LOG = LogFactory.getLog(Progress.class);
   private String status = "";
   private float progress;
   private int currentPhase;
   private ArrayList<Progress> phases = new ArrayList<Progress>();
   private Progress parent;
-  private float progressPerPhase;
 
+  // Each phase can have different progress weightage. For example, in
+  // Map Task, map phase accounts for 66.7% and sort phase for 33.3%.
+  // User needs to give weightages as parameters to all phases(when adding
+  // phases) in a Progress object, if he wants to give weightage to any of the
+  // phases. So when nodes are added without specifying weightage, it means 
+  // fixed weightage for all phases.
+  private boolean fixedWeightageForAllPhases = false;
+  private float progressPerPhase = 0.0f;
+  private ArrayList<Float> progressWeightagesForPhases = new ArrayList<Float>();
+  
   /** Creates a new root node. */
   public Progress() {}
 
@@ -43,15 +56,73 @@
     return phase;
   }
 
-  /** Adds a node to the tree. */
+  /** Adds a node to the tree. Gives equal weightage to all phases */
   public synchronized Progress addPhase() {
+    Progress phase = addNewPhase();
+    // set equal weightage for all phases
+    progressPerPhase = 1.0f / (float)phases.size();
+    fixedWeightageForAllPhases = true;
+    return phase;
+  }
+  
+  /** Adds a new phase. Caller needs to set progress weightage */
+  private synchronized Progress addNewPhase() {
     Progress phase = new Progress();
     phases.add(phase);
     phase.setParent(this);
-    progressPerPhase = 1.0f / (float)phases.size();
     return phase;
   }
 
+  /** Adds a named node with a specified progress weightage to the tree. */
+  public Progress addPhase(String status, float weightage) {
+    Progress phase = addPhase(weightage);
+    phase.setStatus(status);
+
+    return phase;
+  }
+
+  /** Adds a node with a specified progress weightage to the tree. */
+  public synchronized Progress addPhase(float weightage) {
+    Progress phase = new Progress();
+    progressWeightagesForPhases.add(weightage);
+    phases.add(phase);
+    phase.setParent(this);
+
+    // Ensure that the sum of weightages does not cross 1.0
+    float sum = 0;
+    for (int i = 0; i < phases.size(); i++) {
+      sum += progressWeightagesForPhases.get(i);
+    }
+    if (sum > 1.0) {
+      LOG.warn("Sum of weightages can not be more than 1.0; But sum = " + sum);
+    }
+
+    return phase;
+  }
+
+  /** Adds n nodes to the tree. Gives equal weightage to all phases */
+  public synchronized void addPhases(int n) {
+    for (int i = 0; i < n; i++) {
+      addNewPhase();
+    }
+    // set equal weightage for all phases
+    progressPerPhase = 1.0f / (float)phases.size();
+    fixedWeightageForAllPhases = true;
+  }
+
+  /**
+   * returns progress weightage of the given phase
+   * @param phaseNum the phase number of the phase(child node) for which we need
+   *                 progress weightage
+   * @return returns the progress weightage of the specified phase
+   */
+  float getProgressWeightage(int phaseNum) {
+    if (fixedWeightageForAllPhases) {
+      return progressPerPhase; // all phases are of equal weightage
+    }
+    return progressWeightagesForPhases.get(phaseNum);
+  }
+
   synchronized Progress getParent() { return parent; }
   synchronized void setParent(Progress parent) { this.parent = parent; }
   
@@ -89,8 +160,8 @@
   }
 
   /** Returns the overall progress of the root. */
-  // this method probably does not need to be synchronized as getINternal() is synchronized 
-  // and the node's parent never changes. Still, it doesn't hurt. 
+  // this method probably does not need to be synchronized as getInternal() is
+  // synchronized and the node's parent never changes. Still, it doesn't hurt. 
   public synchronized float get() {
     Progress node = this;
     while (node.getParent() != null) {                 // find the root
@@ -99,13 +170,37 @@
     return node.getInternal();
   }
 
+  /**
+   * Returns progress in this node. get() would give overall progress of the
+   * root node(not just given current node).
+   */
+  public synchronized float getProgress() {
+    return getInternal();
+  }
+  
   /** Computes progress in this node. */
   private synchronized float getInternal() {
     int phaseCount = phases.size();
     if (phaseCount != 0) {
-      float subProgress =
-        currentPhase < phaseCount ? phase().getInternal() : 0.0f;
-      return progressPerPhase*(currentPhase + subProgress);
+      float subProgress = 0.0f;
+      float progressFromCurrentPhase = 0.0f;
+      if (currentPhase < phaseCount) {
+        subProgress = phase().getInternal();
+        progressFromCurrentPhase =
+          getProgressWeightage(currentPhase) * subProgress;
+      }
+      
+      float progressFromCompletedPhases = 0.0f;
+      if (fixedWeightageForAllPhases) { // same progress weightage for each phase
+        progressFromCompletedPhases = progressPerPhase * currentPhase;
+      }
+      else {
+        for (int i = 0; i < currentPhase; i++) {
+          // progress weightages of phases could be different. Add them
+          progressFromCompletedPhases += getProgressWeightage(i);
+        }
+      }
+      return  progressFromCompletedPhases + progressFromCurrentPhase;
     } else {
       return progress;
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Wed May 20 09:13:55 2009
@@ -20,7 +20,6 @@
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileFilter;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -121,8 +120,8 @@
     LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, 
     FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, 
     ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, 
-    SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT, 
-    TRACKER_NAME, STATE_STRING, VERSION
+    SHUFFLE_FINISHED, SORT_FINISHED, MAP_FINISHED, COUNTERS, SPLITS,
+    JOB_PRIORITY, HTTP_PORT, TRACKER_NAME, STATE_STRING, VERSION
   }
 
   /**
@@ -1394,29 +1393,56 @@
     /**
      * Log finish time of map task attempt. 
      * @param taskAttemptId task attempt id 
-     * @param finishTime finish time
+     * @param finishTime finish time of map task
      * @param hostName host name 
      * @deprecated Use 
-     * {@link #logFinished(TaskAttemptID, long, String, String, String, Counters)}
+     * {@link #logFinished(TaskAttemptID, long, long, String, String, String,
+     *  Counters)}
      */
     @Deprecated
     public static void logFinished(TaskAttemptID taskAttemptId, long finishTime, 
                                    String hostName){
-      logFinished(taskAttemptId, finishTime, hostName, Values.MAP.name(), "", 
-                  new Counters());
+      logFinished(taskAttemptId, finishTime, finishTime, hostName,
+                  Values.MAP.name(), "", new Counters());
     }
 
     /**
      * Log finish time of map task attempt. 
      * 
      * @param taskAttemptId task attempt id 
-     * @param finishTime finish time
+     * @param finishTime finish time of map task
+     * @param hostName host name 
+     * @param taskType Whether the attempt is cleanup or setup or map 
+     * @param stateString state string of the task attempt
+     * @param counter counters of the task attempt
+     * @deprecated Use 
+     * {@link #logFinished(TaskAttemptID, long, long, String, String, String,
+     *  Counters)}
+     */
+    @Deprecated
+    public static void logFinished(TaskAttemptID taskAttemptId,
+                                   long finishTime, 
+                                   String hostName,
+                                   String taskType,
+                                   String stateString, 
+                                   Counters counter) {
+      logFinished(taskAttemptId, finishTime, finishTime, hostName,
+          taskType, stateString, counter);
+    }
+    
+    /**
+     * Log finish time of map task attempt. 
+     * 
+     * @param taskAttemptId task attempt id 
+     * @param mapFinishTime finish time of map phase in map task
+     * @param finishTime finish time of map task
      * @param hostName host name 
      * @param taskType Whether the attempt is cleanup or setup or map 
      * @param stateString state string of the task attempt
      * @param counter counters of the task attempt
      */
     public static void logFinished(TaskAttemptID taskAttemptId, 
+                                   long mapFinishTime,
                                    long finishTime, 
                                    String hostName,
                                    String taskType,
@@ -1430,12 +1456,14 @@
           JobHistory.log(writer, RecordTypes.MapAttempt, 
                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
+                                     Keys.MAP_FINISHED,
                                      Keys.FINISH_TIME, Keys.HOSTNAME, 
                                      Keys.STATE_STRING, Keys.COUNTERS},
                          new String[]{taskType, 
                                       taskAttemptId.getTaskID().toString(),
                                       taskAttemptId.toString(), 
                                       Values.SUCCESS.name(),  
+                                      String.valueOf(mapFinishTime),
                                       String.valueOf(finishTime), hostName, 
                                       stateString, 
                                       counter.makeEscapedCompactString()}); 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Wed May 20 09:13:55 2009
@@ -2011,7 +2011,9 @@
                                        status.getTaskTracker(), 
                                        ttStatus.getHttpPort(), 
                                        taskType); 
-      JobHistory.MapAttempt.logFinished(status.getTaskID(), status.getFinishTime(), 
+      JobHistory.MapAttempt.logFinished(status.getTaskID(),
+                                        status.getMapFinishTime(),
+                                        status.getFinishTime(), 
                                         trackerHostname, taskType,
                                         status.getStateString(), 
                                         status.getCounters()); 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed May 20 09:13:55 2009
@@ -1086,6 +1086,10 @@
         taskStatus.setShuffleFinishTime(shuffleTime);
         taskStatus.setSortFinishTime(sortTime);
       }
+      else if (type.equals(Values.MAP.name())) {
+        taskStatus.setMapFinishTime(
+            Long.parseLong(attempt.get(Keys.MAP_FINISHED)));
+      }
 
       // Add the counters
       String counterString = attempt.get(Keys.COUNTERS);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Wed May 20 09:13:55 2009
@@ -73,8 +73,13 @@
 
   private static final Log LOG = LogFactory.getLog(MapTask.class.getName());
 
+  private Progress mapPhase;
+  private Progress sortPhase;
+  
+  
   {   // set phase for this task
     setPhase(TaskStatus.Phase.MAP); 
+    getProgress().setStatus("map");
   }
 
   public MapTask() {
@@ -273,6 +278,11 @@
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException, ClassNotFoundException, InterruptedException {
 
+    if (isMapTask()) {
+      mapPhase = getProgress().addPhase("map", 0.667f);
+      sortPhase  = getProgress().addPhase("sort", 0.333f);
+    }
+    
     // start thread that will handle communication with parent
     TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
     reporter.startCommunicationThread();
@@ -348,6 +358,9 @@
 
     try {
       runner.run(in, collector, reporter);      
+      mapPhase.complete();
+      setPhase(TaskStatus.Phase.SORT);
+      statusUpdate(umbilical);
       collector.flush();
     } finally {
       //close
@@ -510,6 +523,9 @@
 
       input.initialize(split, mapperContext);
       mapper.run(mapperContext);
+      mapPhase.complete();
+      setPhase(TaskStatus.Phase.SORT);
+      statusUpdate(umbilical);
       input.close();
       output.close(mapperContext);
     } catch (NoSuchMethodException e) {
@@ -1385,6 +1401,9 @@
         return;
       }
       {
+        sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
+        Merger.considerFinalMergeForProgress();
+        
         IndexRecord rec = new IndexRecord();
         final SpillRecord spillRec = new SpillRecord(partitions);
         for (int parts = 0; parts < partitions; parts++) {
@@ -1406,14 +1425,17 @@
             }
           }
 
+          int mergeFactor = job.getInt("io.sort.factor", 100);
+          // sort the segments only if there are intermediate merges
+          boolean sortSegments = segmentList.size() > mergeFactor;
           //merge
           @SuppressWarnings("unchecked")
           RawKeyValueIterator kvIter = Merger.merge(job, rfs,
                          keyClass, valClass,
-                         segmentList, job.getInt("io.sort.factor", 100),
+                         segmentList, mergeFactor,
                          new Path(mapId.toString()),
-                         job.getOutputKeyComparator(), reporter,
-                         null, spilledRecordsCounter);
+                         job.getOutputKeyComparator(), reporter, sortSegments,
+                         null, spilledRecordsCounter, sortPhase.phase());
 
           //write merged output to disk
           long segmentStart = finalOut.getPos();
@@ -1430,6 +1452,8 @@
           //close
           writer.close();
 
+          sortPhase.startNextPhase();
+          
           // record offsets
           rec.startOffset = segmentStart;
           rec.rawLength = writer.getRawLength();

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java Wed May 20 09:13:55 2009
@@ -18,9 +18,16 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 
 class MapTaskStatus extends TaskStatus {
 
+  private long mapFinishTime;
+  private long sortFinishTime;
+  
   public MapTaskStatus() {}
 
   public MapTaskStatus(TaskAttemptID taskid, float progress,
@@ -35,6 +42,19 @@
     return true;
   }
 
+  /**
+   * Sets finishTime. 
+   * @param finishTime finish time of task.
+   */
+  @Override
+  void setFinishTime(long finishTime) {
+    super.setFinishTime(finishTime);
+    if (mapFinishTime == 0) {
+      mapFinishTime = finishTime;
+    }
+    setSortFinishTime(finishTime);
+  }
+  
   @Override
   public long getShuffleFinishTime() {
     throw new UnsupportedOperationException("getShuffleFinishTime() not supported for MapTask");
@@ -46,12 +66,43 @@
   }
 
   @Override
+  public long getMapFinishTime() {
+    return mapFinishTime;
+  }
+  
+  @Override
+  void setMapFinishTime(long mapFinishTime) {
+    this.mapFinishTime = mapFinishTime;
+  }
+
+  @Override
   public long getSortFinishTime() {
-    throw new UnsupportedOperationException("getSortFinishTime() not supported for MapTask");
+    return sortFinishTime;
   }
 
   @Override
   void setSortFinishTime(long sortFinishTime) {
-    throw new UnsupportedOperationException("setSortFinishTime() not supported for MapTask");
+    this.sortFinishTime = sortFinishTime;
+  }
+  
+  @Override
+  synchronized void statusUpdate(TaskStatus status) {
+    super.statusUpdate(status);
+    
+    if (status.getMapFinishTime() != 0) {
+      this.mapFinishTime = status.getMapFinishTime();
+    }
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    mapFinishTime = in.readLong();
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeLong(mapFinishTime);
   }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Wed May 20 09:13:55 2009
@@ -55,13 +55,14 @@
                             int mergeFactor, Path tmpDir,
                             RawComparator<K> comparator, Progressable reporter,
                             Counters.Counter readsCounter,
-                            Counters.Counter writesCounter)
+                            Counters.Counter writesCounter,
+                            Progress mergePhase)
   throws IOException {
     return 
       new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
                            reporter).merge(keyClass, valueClass,
                                            mergeFactor, tmpDir,
-                                           readsCounter, writesCounter);
+                                           readsCounter, writesCounter, mergePhase);
   }
   
   public static <K extends Object, V extends Object>
@@ -71,10 +72,11 @@
                             int mergeFactor, Path tmpDir,
                             RawComparator<K> comparator, Progressable reporter,
                             Counters.Counter readsCounter,
-                            Counters.Counter writesCounter)
+                            Counters.Counter writesCounter,
+                            Progress mergePhase)
       throws IOException {
     return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
-                 comparator, reporter, false, readsCounter, writesCounter);
+                 comparator, reporter, false, readsCounter, writesCounter, mergePhase);
   }
 
   public static <K extends Object, V extends Object>
@@ -85,12 +87,14 @@
                             RawComparator<K> comparator, Progressable reporter,
                             boolean sortSegments,
                             Counters.Counter readsCounter,
-                            Counters.Counter writesCounter)
+                            Counters.Counter writesCounter,
+                            Progress mergePhase)
       throws IOException {
     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
                            sortSegments).merge(keyClass, valueClass,
                                                mergeFactor, tmpDir,
-                                               readsCounter, writesCounter);
+                                               readsCounter, writesCounter,
+                                               mergePhase);
   }
 
   static <K extends Object, V extends Object>
@@ -101,13 +105,15 @@
                             RawComparator<K> comparator, Progressable reporter,
                             boolean sortSegments,
                             Counters.Counter readsCounter,
-                            Counters.Counter writesCounter)
+                            Counters.Counter writesCounter,
+                            Progress mergePhase)
       throws IOException {
     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
                            sortSegments).merge(keyClass, valueClass,
                                                mergeFactor, inMemSegments,
                                                tmpDir,
-                                               readsCounter, writesCounter);
+                                               readsCounter, writesCounter,
+                                               mergePhase);
   }
 
   public static <K extends Object, V extends Object>
@@ -235,6 +241,20 @@
     }
   }
   
+  // Boolean variable for including/considering final merge as part of sort
+  // phase or not. This is true in map task, false in reduce task. It is
+  // used in calculating mergeProgress.
+  static boolean includeFinalMerge = false;
+  
+  /**
+   * Sets the boolean variable includeFinalMerge to true. Called from
+   * map task before calling merge() so that final merge of map task
+   * is also considered as part of sort phase.
+   */
+  static void considerFinalMergeForProgress() {
+    includeFinalMerge = true;
+  }
+  
   private static class MergeQueue<K extends Object, V extends Object> 
   extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {
     Configuration conf;
@@ -386,24 +406,41 @@
     public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
                                      int factor, Path tmpDir,
                                      Counters.Counter readsCounter,
-                                     Counters.Counter writesCounter)
+                                     Counters.Counter writesCounter,
+                                     Progress mergePhase)
         throws IOException {
       return merge(keyClass, valueClass, factor, 0, tmpDir,
-                   readsCounter, writesCounter);
+                   readsCounter, writesCounter, mergePhase);
     }
 
     RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
                                      int factor, int inMem, Path tmpDir,
                                      Counters.Counter readsCounter,
-                                     Counters.Counter writesCounter)
+                                     Counters.Counter writesCounter,
+                                     Progress mergePhase)
         throws IOException {
       LOG.info("Merging " + segments.size() + " sorted segments");
-      
-      //create the MergeStreams from the sorted map created in the constructor
-      //and dump the final output to a file
+
+      /*
+       * If there are inMemory segments, then they come first in the segments
+       * list and then the sorted disk segments. Otherwise(if there are only
+       * disk segments), then they are sorted segments if there are more than
+       * factor segments in the segments list.
+       */
       int numSegments = segments.size();
       int origFactor = factor;
       int passNo = 1;
+      if (mergePhase != null) {
+        mergeProgress = mergePhase;
+      }
+
+      long totalBytes = computeBytesInMerges(factor, inMem);
+      if (totalBytes != 0) {
+        progPerByte = 1.0f / (float)totalBytes;
+      }
+      
+      //create the MergeStreams from the sorted map created in the constructor
+      //and dump the final output to a file
       do {
         //get the factor for this pass of merge. We assume in-memory segments
         //are the first entries in the segment list and that the pass factor
@@ -460,34 +497,39 @@
         //if we have lesser number of segments remaining, then just return the
         //iterator, else do another single level merge
         if (numSegments <= factor) {
-          // Reset totalBytesProcessed to track the progress of the final merge.
-          // This is considered the progress of the reducePhase, the 3rd phase
-          // of reduce task. Currently totalBytesProcessed is not used in sort
-          // phase of reduce task(i.e. when intermediate merges happen).
-          totalBytesProcessed = startBytes;
-          
-          //calculate the length of the remaining segments. Required for 
-          //calculating the merge progress
-          long totalBytes = 0;
-          for (int i = 0; i < segmentsToMerge.size(); i++) {
-            totalBytes += segmentsToMerge.get(i).getLength();
+          if (!includeFinalMerge) { // for reduce task
+
+            // Reset totalBytesProcessed and recalculate totalBytes from the
+            // remaining segments to track the progress of the final merge.
+            // Final merge is considered as the progress of the reducePhase,
+            // the 3rd phase of reduce task.
+            totalBytesProcessed = 0;
+            totalBytes = 0;
+            for (int i = 0; i < segmentsToMerge.size(); i++) {
+              totalBytes += segmentsToMerge.get(i).getLength();
+            }
           }
           if (totalBytes != 0) //being paranoid
             progPerByte = 1.0f / (float)totalBytes;
           
+          totalBytesProcessed += startBytes;         
           if (totalBytes != 0)
             mergeProgress.set(totalBytesProcessed * progPerByte);
           else
             mergeProgress.set(1.0f); // Last pass and no segments left - we're done
           
           LOG.info("Down to the last merge-pass, with " + numSegments + 
-                   " segments left of total size: " + totalBytes + " bytes");
+                   " segments left of total size: " +
+                   (totalBytes - totalBytesProcessed) + " bytes");
           return this;
         } else {
           LOG.info("Merging " + segmentsToMerge.size() + 
                    " intermediate segments out of a total of " + 
                    (segments.size()+segmentsToMerge.size()));
           
+          long bytesProcessedInPrevMerges = totalBytesProcessed;
+          totalBytesProcessed += startBytes;
+
           //we want to spread the creation of temp files on multiple disks if 
           //available under the space constraints
           long approxOutputSize = 0; 
@@ -516,9 +558,27 @@
           // Add the newly create segment to the list of segments to be merged
           Segment<K, V> tempSegment = 
             new Segment<K, V>(conf, fs, outputFile, codec, false);
-          segments.add(tempSegment);
+
+          // Insert new merged segment into the sorted list
+          int pos = Collections.binarySearch(segments, tempSegment,
+                                             segmentComparator);
+          if (pos < 0) {
+            // binary search failed. So position to be inserted at is -pos-1
+            pos = -pos-1;
+          }
+          segments.add(pos, tempSegment);
           numSegments = segments.size();
-          Collections.sort(segments, segmentComparator);
+          
+          // Subtract the difference between expected size of new segment and 
+          // actual size of new segment(Expected size of new segment is
+          // inputBytesOfThisMerge) from totalBytes. Expected size and actual
+          // size will match(almost) if combiner is not called in merge.
+          long inputBytesOfThisMerge = totalBytesProcessed -
+                                       bytesProcessedInPrevMerges;
+          totalBytes -= inputBytesOfThisMerge - tempSegment.getLength();
+          if (totalBytes != 0) {
+            progPerByte = 1.0f / (float)totalBytes;
+          }
           
           passNo++;
         }
@@ -560,6 +620,57 @@
       }
       return subList;
     }
+    
+    /**
+     * Compute expected size of input bytes to merges, will be used in
+     * calculating mergeProgress. This simulates the above merge() method and
+     * tries to obtain the number of bytes that are going to be merged in all
+     * merges(assuming that there is no combiner called while merging).
+     * @param factor io.sort.factor
+     * @param inMem  number of segments in memory to be merged
+     */
+    long computeBytesInMerges(int factor, int inMem) {
+      int numSegments = segments.size();
+      List<Long> segmentSizes = new ArrayList<Long>(numSegments);
+      long totalBytes = 0;
+      int n = numSegments - inMem;
+      // factor for 1st pass
+      int f = getPassFactor(factor, 1, n) + inMem;
+      n = numSegments;
+ 
+      for (int i = 0; i < numSegments; i++) {
+        // Not handling empty segments here assuming that it would not affect
+        // much in calculation of mergeProgress.
+        segmentSizes.add(segments.get(i).getLength());
+      }
+      
+      if (includeFinalMerge) {
+        // just increment so that the following while loop iterates
+        // for 1 more iteration. This is to include final merge as part of
+        // the computation of expected input bytes of merges
+        n++;
+      }
+      while (n > f) {
+        long mergedSize = 0;
+        f = Math.min(f, segmentSizes.size());
+        for (int j = 0; j < f; j++) {
+          mergedSize += segmentSizes.remove(0);
+        }
+        totalBytes += mergedSize;
+        
+        // insert new size into the sorted list
+        int pos = Collections.binarySearch(segmentSizes, mergedSize);
+        if (pos < 0) {
+          pos = -pos-1;
+        }
+        segmentSizes.add(pos, mergedSize);
+        
+        n -= (f-1);
+        f = factor;
+      }
+
+      return totalBytes;
+    }
 
     public Progress getProgress() {
       return mergeProgress;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Wed May 20 09:13:55 2009
@@ -241,7 +241,7 @@
     }
     
     public void informReduceProgress() {
-      reducePhase.set(super.in.getProgress().get()); // update progress
+      reducePhase.set(super.in.getProgress().getProgress()); // update progress
       reporter.progress();
     }
   }
@@ -392,7 +392,7 @@
           job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
           !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
           new Path(getTaskID().toString()), job.getOutputKeyComparator(),
-          reporter, spilledRecordsCounter, null)
+          reporter, spilledRecordsCounter, null, sortPhase)
       : reduceCopier.createKVIterator(job, rfs, reporter);
         
     // free up the data structures
@@ -1786,9 +1786,7 @@
       InMemFSMergeThread inMemFSMergeThread = null;
       GetMapEventsThread getMapEventsThread = null;
       
-      for (int i = 0; i < numMaps; i++) {
-        copyPhase.addPhase();       // add sub-phase per file
-      }
+      copyPhase.addPhases(numMaps); // add sub-phase per file
       
       copiers = new ArrayList<MapOutputCopier>(numCopiers);
       
@@ -2243,6 +2241,10 @@
       // segments required to vacate memory
       List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
       long inMemToDiskBytes = 0;
+      // sortPhaseFinished will be set to true if we call merge() separately
+      // here to vacate memory(i.e. there will not be any intermediate merges.
+      // In other words, only final merge is pending).
+      boolean sortPhaseFinished = false;
       if (mapOutputsFilesInMemory.size() > 0) {
         TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
         inMemToDiskBytes = createInMemorySegments(memDiskSegments,
@@ -2250,12 +2252,20 @@
         final int numMemDiskSegments = memDiskSegments.size();
         if (numMemDiskSegments > 0 &&
               ioSortFactor > mapOutputFilesOnDisk.size()) {
+          // As we have < ioSortFactor files on disk now, after this
+          // merging of inMemory segments, we would have at most ioSortFactor
+          // files on disk. So only final merge(directly feeding to reducers)
+          // will be pending. i.e. reduce phase will be pending.
+          sortPhaseFinished = true;
+          
           // must spill to disk, but can't retain in-mem for intermediate merge
           final Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
                             reduceTask.getTaskID(), inMemToDiskBytes);
           final RawKeyValueIterator rIter = Merger.merge(job, fs,
               keyClass, valueClass, memDiskSegments, numMemDiskSegments,
-              tmpDir, comparator, reporter, spilledRecordsCounter, null);
+              tmpDir, comparator, reporter, spilledRecordsCounter, null,
+              sortPhase);
+
           final Writer writer = new Writer(job, fs, outputPath,
               keyClass, valueClass, codec, null);
           try {
@@ -2311,10 +2321,12 @@
         final int numInMemSegments = memDiskSegments.size();
         diskSegments.addAll(0, memDiskSegments);
         memDiskSegments.clear();
+        Progress mergePhase = (sortPhaseFinished) ? null : sortPhase; 
         RawKeyValueIterator diskMerge = Merger.merge(
             job, fs, keyClass, valueClass, diskSegments,
             ioSortFactor, 0 == numInMemSegments ? 0 : numInMemSegments - 1,
-            tmpDir, comparator, reporter, false, spilledRecordsCounter, null);
+            tmpDir, comparator, reporter, false, spilledRecordsCounter, null,
+            mergePhase);
         diskSegments.clear();
         if (0 == finalSegments.size()) {
           return diskMerge;
@@ -2324,7 +2336,7 @@
       }
       return Merger.merge(job, fs, keyClass, valueClass,
                    finalSegments, finalSegments.size(), tmpDir,
-                   comparator, reporter, spilledRecordsCounter, null);
+                   comparator, reporter, spilledRecordsCounter, null, null);
     }
 
     class RawKVIteratorReader extends IFile.Reader<K,V> {
@@ -2465,7 +2477,7 @@
                                   codec, mapFiles.toArray(new Path[mapFiles.size()]), 
                                   true, ioSortFactor, tmpDir, 
                                   conf.getOutputKeyComparator(), reporter,
-                                  spilledRecordsCounter, null);
+                                  spilledRecordsCounter, null, null);
               
               Merger.writeFile(iter, writer, reporter, conf);
               writer.close();
@@ -2562,7 +2574,7 @@
                                inMemorySegments, inMemorySegments.size(),
                                new Path(reduceTask.getTaskID().toString()),
                                conf.getOutputKeyComparator(), reporter,
-                               spilledRecordsCounter, null);
+                               spilledRecordsCounter, null, null);
           
           if (combinerRunner == null) {
             Merger.writeFile(rIter, writer, reporter, conf);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java Wed May 20 09:13:55 2009
@@ -88,6 +88,18 @@
   }
 
   @Override
+  public long getMapFinishTime() {
+    throw new UnsupportedOperationException(
+        "getMapFinishTime() not supported for ReduceTask");
+  }
+
+  @Override
+  void setMapFinishTime(long shuffleFinishTime) {
+    throw new UnsupportedOperationException(
+        "setMapFinishTime() not supported for ReduceTask");
+  }
+
+  @Override
   public List<TaskAttemptID> getFetchFailedMaps() {
     return failedFetchTasks;
   }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Wed May 20 09:13:55 2009
@@ -148,6 +148,24 @@
   void setShuffleFinishTime(long shuffleFinishTime) {}
 
   /**
+   * Get map phase finish time for the task. If map finsh time was
+   * not set due to sort phase ending within same heartbeat interval,
+   * it is set to finish time of next phase i.e. sort phase
+   * when it is set.
+   * @return 0 if mapFinishTime, sortFinishTime are not set. else 
+   * it returns approximate map finish time.
+   */
+  public long getMapFinishTime() {
+    return 0;
+  }
+  
+  /**
+   * Set map phase finish time. 
+   * @param mapFinishTime 
+   */
+  void setMapFinishTime(long mapFinishTime) {}
+
+  /**
    * Get sort finish time for the task,. If sort finish time was not set 
    * due to sort and reduce phase finishing in same heartebat interval, it is 
    * set to finish time, when finish time is set. 
@@ -197,12 +215,17 @@
     if (oldPhase != phase){
       // sort phase started
       if (phase == TaskStatus.Phase.SORT){
-        setShuffleFinishTime(System.currentTimeMillis());
+        if (oldPhase == TaskStatus.Phase.MAP) {
+          setMapFinishTime(System.currentTimeMillis());
+        }
+        else {
+          setShuffleFinishTime(System.currentTimeMillis());
+        }
       }else if (phase == TaskStatus.Phase.REDUCE){
         setSortFinishTime(System.currentTimeMillis());
       }
+      this.phase = phase;
     }
-    this.phase = phase; 
   }
 
   boolean inTaskCleanupPhase() {

Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Wed May 20 09:13:55 2009
@@ -373,8 +373,8 @@
                    (status.equals("SUCCESS") || status.equals("FAILED") ||
                     status.equals("KILLED")));
 
-        // Reduce Task Attempts should have valid SHUFFLE_FINISHED time and
-        // SORT_FINISHED time
+        // Successful Reduce Task Attempts should have valid SHUFFLE_FINISHED
+        // time and SORT_FINISHED time
         if (type.equals("REDUCE") && status.equals("SUCCESS")) {
           time1 = attempt.get(Keys.SHUFFLE_FINISHED);
           assertTrue("SHUFFLE_FINISHED time of task attempt " + id +
@@ -389,6 +389,15 @@
           assertTrue("Reduce Task SORT_FINISHED time is < SORT_FINISHED time" +
                      " in history file", areTimesInOrder(time1, time));
         }
+        else if (type.equals("MAP") && status.equals("SUCCESS")) {
+          // Successful MAP Task Attempts should have valid MAP_FINISHED time
+          time1 = attempt.get(Keys.MAP_FINISHED);
+          assertTrue("MAP_FINISHED time of task attempt " + id +
+                     " is in unexpected format:" + time1 +
+                     " in history file", isTimeValid(time1));
+          assertTrue("MAP_FINISHED time of map task is < START_TIME " +
+                     "in history file", areTimesInOrder(time, time1));
+        }
 
         // check if hostname is valid
         String hostname = attempt.get(Keys.HOSTNAME);

Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java Wed May 20 09:13:55 2009
@@ -91,7 +91,7 @@
     RawKeyValueIterator rawItr = 
       Merger.merge(conf, rfs, Text.class, Text.class, codec, new Path[]{path}, 
                    false, conf.getInt("io.sort.factor", 100), tmpDir, 
-                   new Text.Comparator(), new NullProgress(),null,null);
+                   new Text.Comparator(), new NullProgress(), null, null, null);
     @SuppressWarnings("unchecked") // WritableComparators are not generic
     ReduceTask.ValuesIterator valItr = 
       new ReduceTask.ValuesIterator<Text,Text>(rawItr,

Modified: hadoop/core/trunk/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/taskdetails.jsp?rev=776631&r1=776630&r2=776631&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/taskdetails.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/taskdetails.jsp Wed May 20 09:13:55 2009
@@ -126,7 +126,12 @@
 <table border=2 cellpadding="5" cellspacing="2">
 <tr><td align="center">Task Attempts</td><td>Machine</td><td>Status</td><td>Progress</td><td>Start Time</td> 
   <%
-   if (!ts[0].getIsMap() && !isCleanupOrSetup) {
+   if (ts[0].getIsMap()) {
+  %>
+<td>Map Phase Finished</td>
+  <%
+   }
+   else if(!isCleanupOrSetup) {
    %>
 <td>Shuffle Finished</td><td>Sort Finished</td>
   <%
@@ -181,7 +186,12 @@
         out.print("<td>"
           + StringUtils.getFormattedTimeWithDiff(dateFormat, status
           .getStartTime(), 0) + "</td>");
-        if (!ts[i].getIsMap() && !isCleanupOrSetup) {
+        if (ts[i].getIsMap()) {
+          out.print("<td>"
+          + StringUtils.getFormattedTimeWithDiff(dateFormat, status
+          .getMapFinishTime(), status.getStartTime()) + "</td>");
+        }
+        else if (!isCleanupOrSetup) {
           out.print("<td>"
           + StringUtils.getFormattedTimeWithDiff(dateFormat, status
           .getShuffleFinishTime(), status.getStartTime()) + "</td>");