You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 06:53:24 UTC

svn commit: r1079187 - in /hadoop/mapreduce/branches/yahoo-merge/src: java/ java/org/apache/hadoop/mapred/ java/org/apache/hadoop/mapreduce/jobhistory/ java/org/apache/hadoop/mapreduce/server/jobtracker/ test/mapred/org/apache/hadoop/mapreduce/jobhisto...

Author: omalley
Date: Tue Mar  8 05:53:24 2011
New Revision: 1079187

URL: http://svn.apache.org/viewvc?rev=1079187&view=rev
Log:
commit e8d15543fb57fe349e7dca64b4cd82f4ac0c8dfe
Author: Richard King <dk...@yahoo-inc.com>
Date:   Sat Nov 20 00:45:43 2010 +0000

       [MR2037] (dking) records time, CPU time, VM kbytes and
      phys mem kbytes at interim progress points during task execution.
      Flows to job history task attempt completions or failures, and then
      flows into rumen traces as clockSplits, cpuUsages, vMemKbytes, and
      physMemKbytes .  Number of splits you get is controlled by cluster
      parameter mapreduce.jobtracker.jobhistory.task.numberprogresssplits .
      From
    
    +++ b/YAHOO-CHANGES.txt
    +   [MR2037] (dking) records time, CPU time, VM kbytes and
    +  phys mem kbytes at interim progress points during task execution.
    +  Flows to job history task attempt completions or failures, and then
    +  flows into rumen traces as clockSplits, cpuUsages, vMemKbytes, and
    +  physMemKbytes .  Number of splits you get is controlled by cluster
    +  parameter mapreduce.jobtracker.jobhistory.task.numberprogresssplits .
    +  From
    +
    +

Modified:
    hadoop/mapreduce/branches/yahoo-merge/src/java/mapred-default.xml
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Counters.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
    hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
    hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
    hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java
    hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java
    hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java
    hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java
    hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java
    hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java
    hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/mapred-default.xml?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/mapred-default.xml Tue Mar  8 05:53:24 2011
@@ -33,6 +33,19 @@
 </property>
 
 <property>
+  <name>mapreduce.jobtracker.jobhistory.task.numberprogresssplits</name>
+  <value>12</value>
+  <description> Every task attempt progresses from 0.0 to 1.0 [unless
+  it fails or is killed].  We record, for each task attempt, certain 
+  statistics over each twelfth of the progress range.  You can change
+  the number of intervals we divide the entire range of progress into
+  by setting this property.  Higher values give more precision to the
+  recorded data, but costs more memory in the job tracker at runtime.
+  Each increment in this attribute costs 16 bytes per running task.
+  </description>
+</property>
+
+<property>
   <name>mapreduce.job.userhistorylocation</name>
   <value></value>
   <description> User can specify a location to store the history files of 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Counters.java?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Counters.java Tue Mar  8 05:53:24 2011
@@ -407,23 +407,28 @@ public class Counters implements Writabl
    * with the specified name.
    */
   public synchronized Group getGroup(String groupName) {
-    // To provide support for deprecated group names  
-    if (groupName.equals("org.apache.hadoop.mapred.Task$Counter")) {
-      groupName = "org.apache.hadoop.mapreduce.TaskCounter";
-      LOG.warn("Group org.apache.hadoop.mapred.Task$Counter is deprecated." +
-               " Use org.apache.hadoop.mapreduce.TaskCounter instead");
-    } else if (groupName.equals(
-                 "org.apache.hadoop.mapred.JobInProgress$Counter")) {
-      groupName = "org.apache.hadoop.mapreduce.JobCounter";
-      LOG.warn("Group org.apache.hadoop.mapred.JobInProgress$Counter " +
-               "is deprecated. Use " +
-               "org.apache.hadoop.mapreduce.JobCounter instead");
-    }
     Group result = counters.get(groupName);
+
     if (result == null) {
+      // To provide support for deprecated group names  
+      if (groupName.equals("org.apache.hadoop.mapred.Task$Counter")) {
+        LOG.warn("Group org.apache.hadoop.mapred.Task$Counter is deprecated." +
+                 " Use org.apache.hadoop.mapreduce.TaskCounter instead");
+        return getGroup("org.apache.hadoop.mapreduce.TaskCounter");
+      } 
+
+      if (groupName.equals
+          ("org.apache.hadoop.mapred.JobInProgress$Counter")) {
+        LOG.warn("Group org.apache.hadoop.mapred.JobInProgress$Counter " +
+                 "is deprecated. Use " +
+                 "org.apache.hadoop.mapreduce.JobCounter instead");
+        return getGroup("org.apache.hadoop.mapreduce.JobCounter");
+      }
+
       result = new Group(groupName);
       counters.put(groupName, result);
     }
+
     return result;
   }
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Mar  8 05:53:24 2011
@@ -2655,25 +2655,29 @@ public class JobInProgress {
         status.getTaskTracker(),  ttStatus.getHttpPort());
     
     jobHistory.logEvent(tse, status.getTaskID().getJobID());
-    
+    TaskAttemptID statusAttemptID = status.getTaskID();
 
     if (status.getIsMap()){
       MapAttemptFinishedEvent mfe = new MapAttemptFinishedEvent(
-          status.getTaskID(), taskType, TaskStatus.State.SUCCEEDED.toString(),
+          statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
           status.getMapFinishTime(),
           status.getFinishTime(),  trackerHostname,
           status.getStateString(), 
-          new org.apache.hadoop.mapreduce.Counters(status.getCounters()));
+          new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
+          tip.getSplits(statusAttemptID).burst()
+          );
       
       jobHistory.logEvent(mfe,  status.getTaskID().getJobID());
       
     }else{
       ReduceAttemptFinishedEvent rfe = new ReduceAttemptFinishedEvent(
-          status.getTaskID(), taskType, TaskStatus.State.SUCCEEDED.toString(), 
+          statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(), 
           status.getShuffleFinishTime(),
           status.getSortFinishTime(), status.getFinishTime(),
           trackerHostname, status.getStateString(),
-          new org.apache.hadoop.mapreduce.Counters(status.getCounters()));
+          new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
+          tip.getSplits(statusAttemptID).burst()
+          );
       
       jobHistory.logEvent(rfe,  status.getTaskID().getJobID());
       
@@ -3153,12 +3157,16 @@ public class JobInProgress {
         taskid, taskType, startTime, taskTrackerName, taskTrackerPort);
     
     jobHistory.logEvent(tse, taskid.getJobID());
+
+    ProgressSplitsBlock splits = tip.getSplits(taskStatus.getTaskID());
    
-    TaskAttemptUnsuccessfulCompletionEvent tue = 
-      new TaskAttemptUnsuccessfulCompletionEvent(taskid, 
-          taskType, taskStatus.getRunState().toString(),
-          finishTime, 
-          taskTrackerHostName, diagInfo);
+    TaskAttemptUnsuccessfulCompletionEvent tue =
+      new TaskAttemptUnsuccessfulCompletionEvent
+            (taskid, 
+             taskType, taskStatus.getRunState().toString(),
+             finishTime, 
+             taskTrackerHostName, diagInfo,
+             splits.burst());
     jobHistory.logEvent(tue, taskid.getJobID());
         
     // After this, try to assign tasks with the one after this, so that

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Mar  8 05:53:24 2011
@@ -31,25 +31,32 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.mapred.JobInProgress.DataStatistics;
 import org.apache.hadoop.mapred.SortedRanges.Range;
+
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+
 import org.apache.hadoop.net.Node;
 
 
+
 /*************************************************************
  * TaskInProgress maintains all the info needed for a
  * Task in the lifetime of its owning Job.  A given Task
  * might be speculatively executed or reexecuted, so we
  * need a level of indirection above the running-id itself.
  * <br>
- * A given TaskInProgress contains multiple taskids,
+ * A given TaskInProgress contains multiple task attempt ids,
  * 0 or more of which might be executing at any one time.
- * (That's what allows speculative execution.)  A taskid
- * is now *never* recycled.  A TIP allocates enough taskids
+ * (That's what allows speculative execution.)  A task attempt id
+ * is now *never* recycled.  A TIP allocates enough task attempt ids
  * to account for all the speculation and failures it will
  * ever have to handle.  Once those are up, the TIP is dead.
  * **************************************************************
@@ -60,6 +67,10 @@ class TaskInProgress {
   static final long SPECULATIVE_LAG = 60 * 1000;
   private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
 
+  private static final long MEMORY_SPLITS_RESOLUTION = 1024;
+
+  static final int DEFAULT_STATISTICS_INTERVALS = 12;
+
   public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
 
   // Defines the TIP
@@ -91,6 +102,10 @@ class TaskInProgress {
   private volatile boolean skipping = false;
   private boolean jobCleanup = false; 
   private boolean jobSetup = false;
+
+  private static Enum CPU_COUNTER_KEY = TaskCounter.CPU_MILLISECONDS;
+  private static Enum VM_BYTES_KEY = TaskCounter.VIRTUAL_MEMORY_BYTES;
+  private static Enum PHYSICAL_BYTES_KEY = TaskCounter.PHYSICAL_MEMORY_BYTES;
    
   // The 'next' usable taskid of this tip
   int nextTaskId = 0;
@@ -109,12 +124,20 @@ class TaskInProgress {
   private JobConf conf;
   private Map<TaskAttemptID,List<String>> taskDiagnosticData =
     new TreeMap<TaskAttemptID,List<String>>();
+
   /**
-   * Map from taskId -> TaskStatus
+   * Map from task attempt Id -> TaskStatus
    */
   TreeMap<TaskAttemptID,TaskStatus> taskStatuses = 
     new TreeMap<TaskAttemptID,TaskStatus>();
 
+  
+  /**
+   * Map from task attempt Id -> splits block
+   */
+  private Map<TaskAttemptID, ProgressSplitsBlock> splitsBlocks
+    = new TreeMap<TaskAttemptID, ProgressSplitsBlock>();
+
   // Map from taskId -> TaskTracker Id, 
   // contains cleanup attempts and where they ran, if any
   private TreeMap<TaskAttemptID, String> cleanupTasks =
@@ -183,6 +206,65 @@ class TaskInProgress {
     }
     this.user = job.getUser();
   }
+
+  synchronized ProgressSplitsBlock getSplits(TaskAttemptID statusAttemptID) {
+    ProgressSplitsBlock result = splitsBlocks.get(statusAttemptID);
+
+    if (result == null) {
+      result
+        = new ProgressSplitsBlock
+            (conf.getInt(JTConfig.JT_JOBHISTORY_TASKPROGRESS_NUMBER_SPLITS,
+                         ProgressSplitsBlock.DEFAULT_NUMBER_PROGRESS_SPLITS));
+      splitsBlocks.put(statusAttemptID, result);
+    }
+
+    return result;
+  }
+
+  private void updateProgressSplits(TaskStatus taskStatus) {
+    if (!taskStatus.getIncludeCounters()) {
+      return;
+    }
+
+    double newProgress = taskStatus.getProgress();
+
+    Counters counters = taskStatus.getCounters();
+
+    TaskAttemptID statusAttemptID = taskStatus.getTaskID();
+    ProgressSplitsBlock splitsBlock = getSplits(statusAttemptID);
+
+    if (splitsBlock != null) {
+
+      long now = JobTracker.getClock().getTime();
+      Long start = getDispatchTime(statusAttemptID);
+
+      if (start != null && now - start <= Integer.MAX_VALUE) {
+        splitsBlock.progressWallclockTime.extend
+          (newProgress, (int)(now - start));
+      }
+
+      Counters.Counter cpuCounter = counters.findCounter(CPU_COUNTER_KEY);
+      if (cpuCounter != null
+          && cpuCounter.getCounter() <= Integer.MAX_VALUE) {
+        splitsBlock.progressCPUTime.extend
+          (newProgress, (int)(cpuCounter.getCounter()));
+      }
+
+      Counters.Counter virtualBytes = counters.findCounter(VM_BYTES_KEY);
+      if (virtualBytes != null) {
+        splitsBlock.progressVirtualMemoryKbytes.extend
+          (newProgress,
+           (int)(virtualBytes.getCounter() / (MEMORY_SPLITS_RESOLUTION)));
+      }
+
+      Counters.Counter physicalBytes = counters.findCounter(PHYSICAL_BYTES_KEY);
+      if (physicalBytes != null) {
+        splitsBlock.progressPhysicalMemoryKbytes.extend
+          (newProgress,
+           (int)(physicalBytes.getCounter() / (MEMORY_SPLITS_RESOLUTION)));
+      }
+    }
+  }
   
   /**
    * Set the max number of attempts before we declare a TIP as "failed"
@@ -294,6 +376,7 @@ class TaskInProgress {
     return execFinishTime;
   }
 
+
   /**
    * Set the exec finish time
    */
@@ -582,95 +665,99 @@ class TaskInProgress {
    * @return has the task changed its state noticeably?
    */
   synchronized boolean updateStatus(TaskStatus status) {
-    TaskAttemptID taskid = status.getTaskID();
-    String tracker = status.getTaskTracker();
-    String diagInfo = status.getDiagnosticInfo();
-    TaskStatus oldStatus = taskStatuses.get(taskid);
-    boolean changed = true;
-    if (diagInfo != null && diagInfo.length() > 0) {
-      LOG.info("Error from " + taskid + " on " +  tracker + ": "+ diagInfo);
-      addDiagnosticInfo(taskid, diagInfo);
-    }
-    
-    if(skipping) {
-      failedRanges.updateState(status);
-    }
-    
-    if (oldStatus != null) {
-      TaskStatus.State oldState = oldStatus.getRunState();
-      TaskStatus.State newState = status.getRunState();
-          
-      // We should never recieve a duplicate success/failure/killed
-      // status update for the same taskid! This is a safety check, 
-      // and is addressed better at the TaskTracker to ensure this.
-      // @see {@link TaskTracker.transmitHeartbeat()}
-      if ((newState != TaskStatus.State.RUNNING && 
-           newState != TaskStatus.State.COMMIT_PENDING && 
-           newState != TaskStatus.State.FAILED_UNCLEAN && 
-           newState != TaskStatus.State.KILLED_UNCLEAN && 
-           newState != TaskStatus.State.UNASSIGNED) && 
-          (oldState == newState)) {
-        LOG.warn("Recieved duplicate status update of '" + newState + 
-                 "' for '" + taskid + "' of TIP '" + getTIPId() + "'" +
-                 "oldTT=" + oldStatus.getTaskTracker() + 
-                 " while newTT=" + status.getTaskTracker());
-        return false;
-      }
-
-      // The task is not allowed to move from completed back to running.
-      // We have seen out of order status messagesmoving tasks from complete
-      // to running. This is a spot fix, but it should be addressed more
-      // globally.
-      if ((newState == TaskStatus.State.RUNNING || 
-          newState == TaskStatus.State.UNASSIGNED) &&
-          (oldState == TaskStatus.State.FAILED || 
-           oldState == TaskStatus.State.KILLED || 
-           oldState == TaskStatus.State.FAILED_UNCLEAN || 
-           oldState == TaskStatus.State.KILLED_UNCLEAN || 
-           oldState == TaskStatus.State.SUCCEEDED ||
-           oldState == TaskStatus.State.COMMIT_PENDING)) {
-        return false;
+    try {
+      TaskAttemptID taskid = status.getTaskID();
+      String tracker = status.getTaskTracker();
+      String diagInfo = status.getDiagnosticInfo();
+      TaskStatus oldStatus = taskStatuses.get(taskid);
+      boolean changed = true;
+      if (diagInfo != null && diagInfo.length() > 0) {
+        LOG.info("Error from " + taskid + " on " +  tracker + ": "+ diagInfo);
+        addDiagnosticInfo(taskid, diagInfo);
       }
-      
-      //Do not accept any status once the task is marked FAILED/KILLED
-      //This is to handle the case of the JobTracker timing out a task
-      //due to launch delay, but the TT comes back with any state or 
-      //TT got expired
-      if (oldState == TaskStatus.State.FAILED ||
-          oldState == TaskStatus.State.KILLED) {
-        tasksToKill.put(taskid, true);
-        return false;	  
+    
+      if(skipping) {
+        failedRanges.updateState(status);
       }
+    
+      if (oldStatus != null) {
+        TaskStatus.State oldState = oldStatus.getRunState();
+        TaskStatus.State newState = status.getRunState();
           
-      changed = oldState != newState;
-    }
-    // if task is a cleanup attempt, do not replace the complete status,
-    // update only specific fields.
-    // For example, startTime should not be updated, 
-    // but finishTime has to be updated.
-    if (!isCleanupAttempt(taskid)) {
-      taskStatuses.put(taskid, status);
-      //we don't want to include setup tasks in the task execution stats
-      if (!isJobSetupTask() && ((isMapTask() && job.hasSpeculativeMaps()) || 
-          (!isMapTask() && job.hasSpeculativeReduces()))) {
-        long now = JobTracker.getClock().getTime();
-        double oldProgRate = getOldProgressRate();
-        double currProgRate = getCurrentProgressRate(now);
-        job.updateStatistics(oldProgRate, currProgRate, isMapTask());
-        //we need to store the current progress rate, so that we can
-        //update statistics accurately the next time we invoke
-        //updateStatistics
-        setProgressRate(currProgRate);
+        // We should never recieve a duplicate success/failure/killed
+        // status update for the same taskid! This is a safety check, 
+        // and is addressed better at the TaskTracker to ensure this.
+        // @see {@link TaskTracker.transmitHeartbeat()}
+        if ((newState != TaskStatus.State.RUNNING && 
+             newState != TaskStatus.State.COMMIT_PENDING && 
+             newState != TaskStatus.State.FAILED_UNCLEAN && 
+             newState != TaskStatus.State.KILLED_UNCLEAN && 
+             newState != TaskStatus.State.UNASSIGNED) && 
+            (oldState == newState)) {
+          LOG.warn("Recieved duplicate status update of '" + newState + 
+                   "' for '" + taskid + "' of TIP '" + getTIPId() + "'" +
+                   "oldTT=" + oldStatus.getTaskTracker() + 
+                   " while newTT=" + status.getTaskTracker());
+          return false;
+        }
+
+        // The task is not allowed to move from completed back to running.
+        // We have seen out of order status messagesmoving tasks from complete
+        // to running. This is a spot fix, but it should be addressed more
+        // globally.
+        if ((newState == TaskStatus.State.RUNNING || 
+             newState == TaskStatus.State.UNASSIGNED) &&
+            (oldState == TaskStatus.State.FAILED || 
+             oldState == TaskStatus.State.KILLED || 
+             oldState == TaskStatus.State.FAILED_UNCLEAN || 
+             oldState == TaskStatus.State.KILLED_UNCLEAN || 
+             oldState == TaskStatus.State.SUCCEEDED ||
+             oldState == TaskStatus.State.COMMIT_PENDING)) {
+          return false;
+        }
+      
+        //Do not accept any status once the task is marked FAILED/KILLED
+        //This is to handle the case of the JobTracker timing out a task
+        //due to launch delay, but the TT comes back with any state or 
+        //TT got expired
+        if (oldState == TaskStatus.State.FAILED ||
+            oldState == TaskStatus.State.KILLED) {
+          tasksToKill.put(taskid, true);
+          return false;	  
+        }
+          
+        changed = oldState != newState;
+      }
+      // if task is a cleanup attempt, do not replace the complete status,
+      // update only specific fields.
+      // For example, startTime should not be updated, 
+      // but finishTime has to be updated.
+      if (!isCleanupAttempt(taskid)) {
+        taskStatuses.put(taskid, status);
+        //we don't want to include setup tasks in the task execution stats
+        if (!isJobSetupTask() && ((isMapTask() && job.hasSpeculativeMaps()) || 
+                                  (!isMapTask() && job.hasSpeculativeReduces()))) {
+          long now = JobTracker.getClock().getTime();
+          double oldProgRate = getOldProgressRate();
+          double currProgRate = getCurrentProgressRate(now);
+          job.updateStatistics(oldProgRate, currProgRate, isMapTask());
+          //we need to store the current progress rate, so that we can
+          //update statistics accurately the next time we invoke
+          //updateStatistics
+          setProgressRate(currProgRate);
+        }
+      } else {
+        taskStatuses.get(taskid).statusUpdate(status.getRunState(),
+                                              status.getProgress(), status.getStateString(), status.getPhase(),
+                                              status.getFinishTime());
       }
-    } else {
-      taskStatuses.get(taskid).statusUpdate(status.getRunState(),
-        status.getProgress(), status.getStateString(), status.getPhase(),
-        status.getFinishTime());
-    }
 
-    // Recompute progress
-    recomputeProgress();
-    return changed;
+      // Recompute progress
+      recomputeProgress();
+      return changed;
+    } finally {
+      updateProgressSplits(status);
+    }
   }
 
   /**

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr Tue Mar  8 05:53:24 2011
@@ -125,7 +125,11 @@
           {"name": "finishTime", "type": "long"},
           {"name": "hostname", "type": "string"},
           {"name": "state", "type": "string"},
-          {"name": "counters", "type": "JhCounters"}
+          {"name": "counters", "type": "JhCounters"},
+          {"name": "clockSplits", "type": { "type": "array", "items": "int"}},
+          {"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
+          {"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
+          {"name": "physMemKbytes", "type": { "type": "array", "items": "int"}}
       ]
      },
 
@@ -140,7 +144,11 @@
           {"name": "finishTime", "type": "long"},
           {"name": "hostname", "type": "string"},
           {"name": "state", "type": "string"},
-          {"name": "counters", "type": "JhCounters"}
+          {"name": "counters", "type": "JhCounters"},
+          {"name": "clockSplits", "type": { "type": "array", "items": "int"}},
+          {"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
+          {"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
+          {"name": "physMemKbytes", "type": { "type": "array", "items": "int"}}
       ]
      },
 
@@ -176,7 +184,11 @@
           {"name": "finishTime", "type": "long"},
           {"name": "hostname", "type": "string"},
           {"name": "status", "type": "string"},
-          {"name": "error", "type": "string"}
+          {"name": "error", "type": "string"},
+          {"name": "clockSplits", "type": { "type": "array", "items": "int"}},
+          {"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
+          {"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
+          {"name": "physMemKbytes", "type": { "type": "array", "items": "int"}}
       ]
      },
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java Tue Mar  8 05:53:24 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.Count
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
 
 import org.apache.avro.util.Utf8;
 
@@ -48,11 +49,19 @@ public class MapAttemptFinishedEvent  im
    * @param hostname Name of the host where the map executed
    * @param state State string for the attempt
    * @param counters Counters for the attempt
+   * @param allSplits the "splits", or a pixelated graph of various
+   *        measurable worker node state variables against progress.
+   *        Currently there are four; wallclock time, CPU time,
+   *        virtual memory and physical memory. 
+   *
+   *        If you have no splits data, code {@code null} for this
+   *        parameter. 
    */
-  public MapAttemptFinishedEvent(TaskAttemptID id, 
-      TaskType taskType, String taskStatus, 
-      long mapFinishTime, long finishTime,
-      String hostname, String state, Counters counters) {
+  public MapAttemptFinishedEvent
+      (TaskAttemptID id, TaskType taskType, String taskStatus, 
+       long mapFinishTime, long finishTime, String hostname,
+       String state, Counters counters,
+       int[][] allSplits) {
     datum.taskid = new Utf8(id.getTaskID().toString());
     datum.attemptId = new Utf8(id.toString());
     datum.taskType = new Utf8(taskType.name());
@@ -62,8 +71,46 @@ public class MapAttemptFinishedEvent  im
     datum.hostname = new Utf8(hostname);
     datum.state = new Utf8(state);
     datum.counters = EventWriter.toAvro(counters);
+
+    datum.clockSplits
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
+    datum.cpuUsages 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetCPUTime(allSplits));
+    datum.vMemKbytes 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
+    datum.physMemKbytes 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
+  }
+
+  /** 
+   * @deprecated please use the constructor with an additional
+   *              argument, an array of splits arrays instead.  See
+   *              {@link org.apache.hadoop.mapred.ProgressSplitsBlock}
+   *              for an explanation of the meaning of that parameter.
+   *
+   * Create an event for successful completion of map attempts
+   * @param id Task Attempt ID
+   * @param taskType Type of the task
+   * @param taskStatus Status of the task
+   * @param mapFinishTime Finish time of the map phase
+   * @param finishTime Finish time of the attempt
+   * @param hostname Name of the host where the map executed
+   * @param state State string for the attempt
+   * @param counters Counters for the attempt
+   */
+  @Deprecated
+  public MapAttemptFinishedEvent
+      (TaskAttemptID id, TaskType taskType, String taskStatus, 
+       long mapFinishTime, long finishTime, String hostname,
+       String state, Counters counters) {
+    this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, state, counters, null);
   }
   
+  
   MapAttemptFinishedEvent() {}
 
   public Object getDatum() { return datum; }
@@ -97,5 +144,18 @@ public class MapAttemptFinishedEvent  im
    public EventType getEventType() {
     return EventType.MAP_ATTEMPT_FINISHED;
   }
+
+  public int[] getClockSplits() {
+    return AvroArrayUtils.fromAvro(datum.clockSplits);
+  }
+  public int[] getCpuUsages() {
+    return AvroArrayUtils.fromAvro(datum.cpuUsages);
+  }
+  public int[] getVMemKbytes() {
+    return AvroArrayUtils.fromAvro(datum.vMemKbytes);
+  }
+  public int[] getPhysMemKbytes() {
+    return AvroArrayUtils.fromAvro(datum.physMemKbytes);
+  }
   
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java Tue Mar  8 05:53:24 2011
@@ -27,6 +27,8 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
+
 import org.apache.avro.util.Utf8;
 
 /**
@@ -50,12 +52,16 @@ public class ReduceAttemptFinishedEvent 
    * @param hostname Name of the host where the attempt executed
    * @param state State of the attempt
    * @param counters Counters for the attempt
+   * @param allSplits the "splits", or a pixelated graph of various
+   *        measurable worker node state variables against progress.
+   *        Currently there are four; wallclock time, CPU time,
+   *        virtual memory and physical memory.  
    */
-  public ReduceAttemptFinishedEvent(TaskAttemptID id, 
-      TaskType taskType, String taskStatus, 
-      long shuffleFinishTime, long sortFinishTime, 
-      long finishTime,
-      String hostname, String state, Counters counters) {
+  public ReduceAttemptFinishedEvent
+    (TaskAttemptID id, TaskType taskType, String taskStatus, 
+     long shuffleFinishTime, long sortFinishTime, long finishTime,
+     String hostname, String state, Counters counters,
+     int[][] allSplits) {
     datum.taskid = new Utf8(id.getTaskID().toString());
     datum.attemptId = new Utf8(id.toString());
     datum.taskType = new Utf8(taskType.name());
@@ -66,6 +72,45 @@ public class ReduceAttemptFinishedEvent 
     datum.hostname = new Utf8(hostname);
     datum.state = new Utf8(state);
     datum.counters = EventWriter.toAvro(counters);
+
+    datum.clockSplits 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
+    datum.cpuUsages 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetCPUTime(allSplits));
+    datum.vMemKbytes 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
+    datum.physMemKbytes 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
+  }
+
+  /**
+   * @deprecated please use the constructor with an additional
+   *              argument, an array of splits arrays instead.  See
+   *              {@link org.apache.hadoop.mapred.ProgressSplitsBlock}
+   *              for an explanation of the meaning of that parameter.
+   *
+   * Create an event to record completion of a reduce attempt
+   * @param id Attempt Id
+   * @param taskType Type of task
+   * @param taskStatus Status of the task
+   * @param shuffleFinishTime Finish time of the shuffle phase
+   * @param sortFinishTime Finish time of the sort phase
+   * @param finishTime Finish time of the attempt
+   * @param hostname Name of the host where the attempt executed
+   * @param state State of the attempt
+   * @param counters Counters for the attempt
+   */
+  public ReduceAttemptFinishedEvent
+    (TaskAttemptID id, TaskType taskType, String taskStatus, 
+     long shuffleFinishTime, long sortFinishTime, long finishTime,
+     String hostname, String state, Counters counters) {
+    this(id, taskType, taskStatus,
+         shuffleFinishTime, sortFinishTime, finishTime,
+         hostname, state, counters, null);
   }
 
   ReduceAttemptFinishedEvent() {}
@@ -105,4 +150,17 @@ public class ReduceAttemptFinishedEvent 
   }
 
 
+  public int[] getClockSplits() {
+    return AvroArrayUtils.fromAvro(datum.clockSplits);
+  }
+  public int[] getCpuUsages() {
+    return AvroArrayUtils.fromAvro(datum.cpuUsages);
+  }
+  public int[] getVMemKbytes() {
+    return AvroArrayUtils.fromAvro(datum.vMemKbytes);
+  }
+  public int[] getPhysMemKbytes() {
+    return AvroArrayUtils.fromAvro(datum.physMemKbytes);
+  }
+
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java Tue Mar  8 05:53:24 2011
@@ -27,6 +27,9 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
+import org.apache.hadoop.mapred.TaskStatus;
+
 import org.apache.avro.util.Utf8;
 
 /**
@@ -47,11 +50,16 @@ public class TaskAttemptUnsuccessfulComp
    * @param finishTime Finish time of the attempt
    * @param hostname Name of the host where the attempt executed
    * @param error Error string
+   * @param allSplits the "splits", or a pixelated graph of various
+   *        measurable worker node state variables against progress.
+   *        Currently there are four; wallclock time, CPU time,
+   *        virtual memory and physical memory.  
    */
-  public TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID id, 
-      TaskType taskType,
-      String status, long finishTime, 
-      String hostname, String error) {
+  public TaskAttemptUnsuccessfulCompletionEvent
+       (TaskAttemptID id, TaskType taskType,
+        String status, long finishTime, 
+        String hostname, String error,
+        int[][] allSplits) {
     datum.taskid = new Utf8(id.getTaskID().toString());
     datum.taskType = new Utf8(taskType.name());
     datum.attemptId = new Utf8(id.toString());
@@ -59,6 +67,40 @@ public class TaskAttemptUnsuccessfulComp
     datum.hostname = new Utf8(hostname);
     datum.error = new Utf8(error);
     datum.status = new Utf8(status);
+
+    datum.clockSplits 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
+    datum.cpuUsages 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetCPUTime(allSplits));
+    datum.vMemKbytes 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
+    datum.physMemKbytes 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
+  }
+
+  /** 
+   * @deprecated please use the constructor with an additional
+   *              argument, an array of splits arrays instead.  See
+   *              {@link org.apache.hadoop.mapred.ProgressSplitsBlock}
+   *              for an explanation of the meaning of that parameter.
+   *
+   * Create an event to record the unsuccessful completion of attempts
+   * @param id Attempt ID
+   * @param taskType Type of the task
+   * @param status Status of the attempt
+   * @param finishTime Finish time of the attempt
+   * @param hostname Name of the host where the attempt executed
+   * @param error Error string
+   */
+  public TaskAttemptUnsuccessfulCompletionEvent
+       (TaskAttemptID id, TaskType taskType,
+        String status, long finishTime, 
+        String hostname, String error) {
+    this(id, taskType, status, finishTime, hostname, error, null);
   }
 
   TaskAttemptUnsuccessfulCompletionEvent() {}
@@ -101,4 +143,19 @@ public class TaskAttemptUnsuccessfulComp
               : EventType.REDUCE_ATTEMPT_KILLED);
   }
 
+
+
+  public int[] getClockSplits() {
+    return AvroArrayUtils.fromAvro(datum.clockSplits);
+  }
+  public int[] getCpuUsages() {
+    return AvroArrayUtils.fromAvro(datum.cpuUsages);
+  }
+  public int[] getVMemKbytes() {
+    return AvroArrayUtils.fromAvro(datum.vMemKbytes);
+  }
+  public int[] getPhysMemKbytes() {
+    return AvroArrayUtils.fromAvro(datum.physMemKbytes);
+  }
+
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java Tue Mar  8 05:53:24 2011
@@ -86,6 +86,9 @@ public interface JTConfig extends MRConf
     "mapreduce.jobtracker.jobhistory.completed.location";
   public static final String JT_JOBHISTORY_LOCATION = 
     "mapreduce.jobtracker.jobhistory.location";
+  // number of partial task progress reports we retain in job history
+  public static final String JT_JOBHISTORY_TASKPROGRESS_NUMBER_SPLITS =
+    "mapreduce.jobtracker.jobhistory.task.numberprogresssplits";
   public static final String JT_AVG_BLACKLIST_THRESHOLD = 
     "mapreduce.jobtracker.blacklist.average.threshold";
   public static final String JT_SYSTEM_DIR = "mapreduce.jobtracker.system.dir";

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java Tue Mar  8 05:53:24 2011
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.mapreduce.jobhistory;
 
+import java.util.List;
+import java.util.ArrayList;
+
 import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -28,6 +31,15 @@ import junit.framework.TestCase;
  * Test various jobhistory events
  */
 public class TestJobHistoryEvents extends TestCase {
+  static final int[][] NULL_SPLITS_ARRAY
+    = new int[org.apache.hadoop.tools.rumen.LoggedTaskAttempt.SplitVectorKind.values().length][];
+
+  static {
+    for (int i = 0; i < NULL_SPLITS_ARRAY.length; ++i) {
+      NULL_SPLITS_ARRAY[i] = new int[0];
+    }
+  }
+ 
   /**
    * Test {@link TaskAttemptStartedEvent} for various task types.
    */
@@ -73,7 +85,8 @@ public class TestJobHistoryEvents extend
                                                      String state) {
     for (TaskType t : types) {
       TaskAttemptUnsuccessfulCompletionEvent tauce = 
-        new TaskAttemptUnsuccessfulCompletionEvent(id, t, state, 0L, "", "");
+        new TaskAttemptUnsuccessfulCompletionEvent
+           (id, t, state, 0L, "", "", NULL_SPLITS_ARRAY);
       assertEquals(expected, tauce.getEventType());
     }
   }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Tue Mar  8 05:53:24 2011
@@ -590,6 +590,30 @@ public class TestRumenJobTraces {
   public void testTopologyBuilder() throws Exception {
     final TopologyBuilder subject = new TopologyBuilder();
 
+    // This 4 comes from 
+    //   TaskInProgress.ProgressibleSplitsBlock.burst().size , which 
+    //   is invisible here.
+
+    int[][] splits = new int[4][];
+
+    splits[0] = new int[12];
+    splits[1] = new int[12];
+    splits[2] = new int[12];
+    splits[3] = new int[12];
+
+    for (int j = 0; j < 4; ++j) {
+      for (int i = 0; i < 12; ++i) {
+        splits[j][i] = -1;
+      }
+    }
+
+    for (int i = 0; i < 6; ++i) {
+      splits[0][i] = 500000 * i;
+      splits[1][i] = 300000 * i;
+      splits[2][i] = 500000;
+      splits[3][i] = 700000;
+    }
+
     // currently we extract no host names from the Properties
     subject.process(new Properties());
 
@@ -598,16 +622,16 @@ public class TestRumenJobTraces {
         .valueOf("MAP"), "STATUS", 1234567890L,
         "/194\\.6\\.134\\.64/cluster50261\\.secondleveldomain\\.com",
         "SUCCESS", null));
-    subject.process(new TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID
-        .forName("attempt_200904211745_0003_m_000004_1"), TaskType
-        .valueOf("MAP"), "STATUS", 1234567890L,
-        "/194\\.6\\.134\\.80/cluster50262\\.secondleveldomain\\.com",
-        "MACHINE_EXPLODED"));
-    subject.process(new TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID
-        .forName("attempt_200904211745_0003_m_000004_2"), TaskType
-        .valueOf("MAP"), "STATUS", 1234567890L,
-        "/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com",
-        "MACHINE_EXPLODED"));
+    subject.process(new TaskAttemptUnsuccessfulCompletionEvent
+                    (TaskAttemptID.forName("attempt_200904211745_0003_m_000004_1"),
+                     TaskType.valueOf("MAP"), "STATUS", 1234567890L,
+                     "/194\\.6\\.134\\.80/cluster50262\\.secondleveldomain\\.com",
+                     "MACHINE_EXPLODED", splits));
+    subject.process(new TaskAttemptUnsuccessfulCompletionEvent
+                    (TaskAttemptID.forName("attempt_200904211745_0003_m_000004_2"),
+                     TaskType.valueOf("MAP"), "STATUS", 1234567890L,
+                     "/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com",
+                     "MACHINE_EXPLODED", splits));
     subject.process(new TaskStartedEvent(TaskID
         .forName("task_200904211745_0003_m_000004"), 1234567890L, TaskType
         .valueOf("MAP"),

Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java Tue Mar  8 05:53:24 2011
@@ -468,6 +468,11 @@ public class JobBuilder {
     }
 
     attempt.setFinishTime(event.getFinishTime());
+
+    attempt.arraySetClockSplits(event.getClockSplits());
+    attempt.arraySetCpuUsages(event.getCpuUsages());
+    attempt.arraySetVMemKbytes(event.getVMemKbytes());
+    attempt.arraySetPhysMemKbytes(event.getPhysMemKbytes());
   }
 
   private void processTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
@@ -513,6 +518,10 @@ public class JobBuilder {
     attempt.setSortFinished(event.getSortFinishTime());
     attempt
         .incorporateCounters(((ReduceAttemptFinished) event.getDatum()).counters);
+    attempt.arraySetClockSplits(event.getClockSplits());
+    attempt.arraySetCpuUsages(event.getCpuUsages());
+    attempt.arraySetVMemKbytes(event.getVMemKbytes());
+    attempt.arraySetPhysMemKbytes(event.getPhysMemKbytes());
   }
 
   private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
@@ -529,7 +538,11 @@ public class JobBuilder {
     // is redundant, but making this will add future-proofing.
     attempt.setFinishTime(event.getFinishTime());
     attempt
-        .incorporateCounters(((MapAttemptFinished) event.getDatum()).counters);
+      .incorporateCounters(((MapAttemptFinished) event.getDatum()).counters);
+    attempt.arraySetClockSplits(event.getClockSplits());
+    attempt.arraySetCpuUsages(event.getCpuUsages());
+    attempt.arraySetVMemKbytes(event.getVMemKbytes());
+    attempt.arraySetPhysMemKbytes(event.getPhysMemKbytes());
   }
 
   private void processJobUnsuccessfulCompletionEvent(

Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java Tue Mar  8 05:53:24 2011
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.tools.rumen;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -69,10 +71,118 @@ public class LoggedTaskAttempt implement
 
   LoggedLocation location;
 
+  List<Integer> clockSplits = new ArrayList<Integer>();
+  List<Integer> cpuUsages = new ArrayList<Integer>();
+  List<Integer> vMemKbytes = new ArrayList<Integer>();
+  List<Integer> physMemKbytes = new ArrayList<Integer>();
+
   LoggedTaskAttempt() {
     super();
   }
 
+  // carries the kinds of splits vectors a LoggedTaskAttempt holds.
+  //
+  // Each enumeral has the following methods:
+  //   get(LoggedTaskAttempt attempt)
+  //    returns a List<Integer> with the corresponding value field
+  //   set(LoggedTaskAttempt attempt, List<Integer> newValue)
+  //    sets the value
+  // There is also a pair of methods get(List<List<Integer>>) and
+  //  set(List<List<Integer>>, List<Integer>) which correspondingly
+  //  delivers or sets the appropriate element of the
+  //  List<List<Integer>> .
+  // This makes it easier to add another kind in the future.
+  public enum SplitVectorKind {
+
+    WALLCLOCK_TIME {
+      @Override
+      public List<Integer> get(LoggedTaskAttempt attempt) {
+        return attempt.getClockSplits();
+      }
+      @Override
+      public void set(LoggedTaskAttempt attempt, List<Integer> newValue) {
+        attempt.setClockSplits(newValue);
+      }
+    },
+
+    CPU_USAGE {
+      @Override
+      public List<Integer> get(LoggedTaskAttempt attempt) {
+        return attempt.getCpuUsages();
+      }
+      @Override
+      public void set(LoggedTaskAttempt attempt, List<Integer> newValue) {
+        attempt.setCpuUsages(newValue);
+      }
+    },
+
+    VIRTUAL_MEMORY_KBYTES {
+      @Override
+      public List<Integer> get(LoggedTaskAttempt attempt) {
+        return attempt.getVMemKbytes();
+      }
+      @Override
+      public void set(LoggedTaskAttempt attempt, List<Integer> newValue) {
+        attempt.setVMemKbytes(newValue);
+      }
+    },
+
+    PHYSICAL_MEMORY_KBYTES {
+      @Override
+      public List<Integer> get(LoggedTaskAttempt attempt) {
+        return attempt.getPhysMemKbytes();
+      }
+      @Override
+      public void set(LoggedTaskAttempt attempt, List<Integer> newValue) {
+        attempt.setPhysMemKbytes(newValue);
+      }
+    };
+
+    static private final List<List<Integer>> NULL_SPLITS_VECTOR
+      = new ArrayList<List<Integer>>();
+
+    static {
+      for (SplitVectorKind kind : SplitVectorKind.values() ) {
+        NULL_SPLITS_VECTOR.add(new ArrayList<Integer>());
+      }
+    }
+
+    abstract public List<Integer> get(LoggedTaskAttempt attempt);
+
+    abstract public void set(LoggedTaskAttempt attempt, List<Integer> newValue);
+
+    public List<Integer> get(List<List<Integer>> listSplits) {
+      return listSplits.get(this.ordinal());
+    }
+
+    public void set(List<List<Integer>> listSplits, List<Integer> newValue) {
+      listSplits.set(this.ordinal(), newValue);
+    }
+
+    static public List<List<Integer>> getNullSplitsVector() {
+      return NULL_SPLITS_VECTOR;
+    }
+  }
+
+  /**
+   *
+   * @returns a list of all splits vectors, ordered in enumeral order
+   *           within {@link SplitVectorKind} .  Do NOT use hard-coded
+   *           indices within the return for this with a hard-coded
+   *           index to get individual values; use
+   *           {@code SplitVectorKind.get(LoggedTaskAttempt)} instead.
+   */
+  public List<List<Integer>> allSplitVectors() {
+    List<List<Integer>> result
+      = new ArrayList<List<Integer>>(SplitVectorKind.values().length);
+
+    for (SplitVectorKind kind : SplitVectorKind.values() ) {
+      result.add(kind.get(this));
+    }
+
+    return result;
+  }
+
   static private Set<String> alreadySeenAnySetterAttributes =
       new TreeSet<String>();
 
@@ -87,6 +197,78 @@ public class LoggedTaskAttempt implement
     }
   }
 
+  public List<Integer> getClockSplits() {
+    return clockSplits;
+  }
+
+  void setClockSplits(List<Integer> clockSplits) {
+    this.clockSplits = clockSplits;
+  }
+
+  void arraySetClockSplits(int[] clockSplits) {
+    List<Integer> result = new ArrayList<Integer>();
+
+    for (int i = 0; i < clockSplits.length; ++i) {
+      result.add(clockSplits[i]);
+    }
+                 
+    this.clockSplits = result;
+  }
+
+  public List<Integer> getCpuUsages() {
+    return cpuUsages;
+  }
+
+  void setCpuUsages(List<Integer> cpuUsages) {
+    this.cpuUsages = cpuUsages;
+  }
+
+  void arraySetCpuUsages(int[] cpuUsages) {
+    List<Integer> result = new ArrayList<Integer>();
+
+    for (int i = 0; i < cpuUsages.length; ++i) {
+      result.add(cpuUsages[i]);
+    }
+                 
+    this.cpuUsages = result;
+  }
+
+  public List<Integer> getVMemKbytes() {
+    return vMemKbytes;
+  }
+
+  void setVMemKbytes(List<Integer> vMemKbytes) {
+    this.vMemKbytes = vMemKbytes;
+  }
+
+  void arraySetVMemKbytes(int[] vMemKbytes) {
+    List<Integer> result = new ArrayList<Integer>();
+
+    for (int i = 0; i < vMemKbytes.length; ++i) {
+      result.add(vMemKbytes[i]);
+    }
+                 
+    this.vMemKbytes = result;
+  }
+
+  public List<Integer> getPhysMemKbytes() {
+    return physMemKbytes;
+  }
+
+  void setPhysMemKbytes(List<Integer> physMemKbytes) {
+    this.physMemKbytes = physMemKbytes;
+  }
+
+  void arraySetPhysMemKbytes(int[] physMemKbytes) {
+    List<Integer> result = new ArrayList<Integer>();
+
+    for (int i = 0; i < physMemKbytes.length; ++i) {
+      result.add(physMemKbytes[i]);
+    }
+                 
+    this.physMemKbytes = result;
+  }
+
   void adjustTimes(long adjustment) {
     startTime += adjustment;
     finishTime += adjustment;
@@ -436,6 +618,26 @@ public class LoggedTaskAttempt implement
     c1.deepCompare(c2, recurse);
   }
 
+  private void compare1(List<Integer> c1, List<Integer> c2, TreePath loc,
+                        String eltname)
+        throws DeepInequalityException {
+    if (c1 == null && c2 == null) {
+      return;
+    }
+
+    if (c1 == null || c2 == null || c1.size() != c2.size()) {
+      throw new DeepInequalityException
+              (eltname + " miscompared", new TreePath(loc, eltname));
+    }
+
+    for (int i = 0; i < c1.size(); ++i) {
+      if (!c1.get(i).equals(c2.get(i))) {
+        throw new DeepInequalityException("" + c1.get(i) + " != " + c2.get(i),
+                                          new TreePath(loc, eltname, i));
+      }
+    }
+  }    
+
   public void deepCompare(DeepCompare comparand, TreePath loc)
       throws DeepInequalityException {
     if (!(comparand instanceof LoggedTaskAttempt)) {
@@ -474,5 +676,10 @@ public class LoggedTaskAttempt implement
     compare1(sortFinished, other.sortFinished, loc, "sortFinished");
 
     compare1(location, other.location, loc, "location");
+
+    compare1(clockSplits, other.clockSplits, loc, "clockSplits");
+    compare1(cpuUsages, other.cpuUsages, loc, "cpuUsages");
+    compare1(vMemKbytes, other.vMemKbytes, loc, "vMemKbytes");
+    compare1(physMemKbytes, other.physMemKbytes, loc, "physMemKbytes");
   }
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java Tue Mar  8 05:53:24 2011
@@ -68,10 +68,13 @@ public class MapAttempt20LineHistoryEven
             (MapAttempt20LineHistoryEventEmitter) thatg;
 
         if (finishTime != null && "success".equalsIgnoreCase(status)) {
-          return new MapAttemptFinishedEvent(taskAttemptID,
-              that.originalTaskType, status, Long.parseLong(finishTime), Long
-                  .parseLong(finishTime), hostName, state,
-              maybeParseCounters(counters));
+          return new MapAttemptFinishedEvent
+            (taskAttemptID,
+              that.originalTaskType, status,
+             Long.parseLong(finishTime),
+             Long.parseLong(finishTime),
+             hostName, state, maybeParseCounters(counters),
+             null);
         }
       }
 
@@ -88,5 +91,4 @@ public class MapAttempt20LineHistoryEven
   List<SingleEventEmitter> nonFinalSEEs() {
     return nonFinals;
   }
-
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java Tue Mar  8 05:53:24 2011
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.tools.rumen;
 
+import java.util.List;
+
 import org.apache.hadoop.mapred.TaskStatus.State;
 
 /**
@@ -26,11 +28,33 @@ import org.apache.hadoop.mapred.TaskStat
 public class MapTaskAttemptInfo extends TaskAttemptInfo {
   private long runtime;
 
-  public MapTaskAttemptInfo(State state, TaskInfo taskInfo, long runtime) {
-    super(state, taskInfo);
+  public MapTaskAttemptInfo(State state, TaskInfo taskInfo,
+                            long runtime, List<List<Integer>> allSplits) {
+    super(state, taskInfo,
+          allSplits == null
+            ? LoggedTaskAttempt.SplitVectorKind.getNullSplitsVector()
+           : allSplits);
     this.runtime = runtime;
   }
 
+  /**
+   *
+   * @deprecated please use the constructor with 
+   *               {@code (state, taskInfo, runtime,
+   *                  List<List<Integer>> allSplits)}
+   *             instead.  
+   *
+   * see {@link LoggedTaskAttempt} for an explanation of
+   *        {@code allSplits}.
+   *
+   * If there are no known splits, use {@code null}.
+   */
+  @Deprecated
+  public MapTaskAttemptInfo(State state, TaskInfo taskInfo,
+                            long runtime) {
+    this(state, taskInfo, runtime, null);
+  }
+
   @Override
   public long getRuntime() {
     return getMapRuntime();

Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java Tue Mar  8 05:53:24 2011
@@ -28,8 +28,8 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
 
-public class ReduceAttempt20LineHistoryEventEmitter extends
-    TaskAttempt20LineEventEmitter {
+public class ReduceAttempt20LineHistoryEventEmitter
+     extends TaskAttempt20LineEventEmitter {
 
   static List<SingleEventEmitter> nonFinals =
       new LinkedList<SingleEventEmitter>();
@@ -71,10 +71,15 @@ public class ReduceAttempt20LineHistoryE
           ReduceAttempt20LineHistoryEventEmitter that =
               (ReduceAttempt20LineHistoryEventEmitter) thatg;
 
-          return new ReduceAttemptFinishedEvent(taskAttemptID,
-              that.originalTaskType, status, Long.parseLong(shuffleFinish),
-              Long.parseLong(sortFinish), Long.parseLong(finishTime), hostName,
-              state, maybeParseCounters(counters));
+          return new ReduceAttemptFinishedEvent
+            (taskAttemptID,
+             that.originalTaskType, status,
+             Long.parseLong(shuffleFinish),
+             Long.parseLong(sortFinish),
+             Long.parseLong(finishTime),
+             hostName,
+             state, maybeParseCounters(counters),
+             null);
         }
       }
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java Tue Mar  8 05:53:24 2011
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.tools.rumen;
 
+import java.util.List;
+
 import org.apache.hadoop.mapred.TaskStatus.State;
 
 /**
@@ -29,13 +31,35 @@ public class ReduceTaskAttemptInfo exten
   private long reduceTime;
 
   public ReduceTaskAttemptInfo(State state, TaskInfo taskInfo, long shuffleTime,
-      long mergeTime, long reduceTime) {
-    super(state, taskInfo);
+      long mergeTime, long reduceTime, List<List<Integer>> allSplits) {
+    super(state, taskInfo,
+          allSplits == null
+            ? LoggedTaskAttempt.SplitVectorKind.getNullSplitsVector()
+           : allSplits);
     this.shuffleTime = shuffleTime;
     this.mergeTime = mergeTime;
     this.reduceTime = reduceTime;
   }
 
+
+  /**
+   *
+   * @deprecated please use the constructor with 
+   *               {@code (state, taskInfo, shuffleTime, mergeTime, reduceTime
+   *                  List<List<Integer>> allSplits)}
+   *             instead.  
+   *
+   * see {@link LoggedTaskAttempt} for an explanation of
+   *        {@code allSplits}.
+   *
+   * If there are no known splits, use {@code null}.
+   */
+  @Deprecated
+  public ReduceTaskAttemptInfo(State state, TaskInfo taskInfo, long shuffleTime,
+      long mergeTime, long reduceTime) {
+    this(state, taskInfo, shuffleTime, mergeTime, reduceTime, null);
+  }
+
   /**
    * Get the runtime for the <b>reduce</b> phase of the reduce task-attempt.
    * 
@@ -67,5 +91,4 @@ public class ReduceTaskAttemptInfo exten
   public long getRuntime() {
     return (getShuffleRuntime() + getMergeRuntime() + getReduceRuntime());
   }
-
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java Tue Mar  8 05:53:24 2011
@@ -138,9 +138,10 @@ public abstract class TaskAttempt20LineE
         TaskAttempt20LineEventEmitter that =
             (TaskAttempt20LineEventEmitter) thatg;
 
-        return new TaskAttemptUnsuccessfulCompletionEvent(taskAttemptID,
-            that.originalTaskType, status, Long.parseLong(finishTime),
-            hostName, error);
+        return new TaskAttemptUnsuccessfulCompletionEvent
+          (taskAttemptID,
+           that.originalTaskType, status, Long.parseLong(finishTime),
+           hostName, error, null);
       }
 
       return null;

Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java Tue Mar  8 05:53:24 2011
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.tools.rumen;
 
+import java.util.List;
+
 import org.apache.hadoop.mapred.TaskStatus.State;
 
 /**
@@ -27,13 +29,22 @@ public abstract class TaskAttemptInfo {
   protected final State state;
   protected final TaskInfo taskInfo;
 
-  protected TaskAttemptInfo(State state, TaskInfo taskInfo) {
+  protected final List<List<Integer>> allSplits;
+
+  protected TaskAttemptInfo
+       (State state, TaskInfo taskInfo, List<List<Integer>> allSplits) {
     if (state == State.SUCCEEDED || state == State.FAILED) {
       this.state = state;
     } else {
       throw new IllegalArgumentException("status cannot be " + state);
     }
     this.taskInfo = taskInfo;
+    this.allSplits = allSplits;
+  }
+
+  protected TaskAttemptInfo
+       (State state, TaskInfo taskInfo) {
+    this(state, taskInfo, LoggedTaskAttempt.SplitVectorKind.getNullSplitsVector());
   }
 
   /**
@@ -60,4 +71,8 @@ public abstract class TaskAttemptInfo {
   public TaskInfo getTaskInfo() {
     return taskInfo;
   }
+      
+  public List<Integer> getSplitVector(LoggedTaskAttempt.SplitVectorKind kind) {
+    return kind.get(allSplits);
+  }
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java?rev=1079187&r1=1079186&r2=1079187&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java Tue Mar  8 05:53:24 2011
@@ -525,7 +525,8 @@ public class ZombieJob implements JobSto
       }
       taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID());
       taskTime *= scaleFactor;
-      return new MapTaskAttemptInfo(state, taskInfo, taskTime);
+      return new MapTaskAttemptInfo
+        (state, taskInfo, taskTime, loggedAttempt.allSplitVectors());
     } else {
       throw new IllegalArgumentException("taskType can only be MAP: "
           + loggedTask.getTaskType());
@@ -572,6 +573,9 @@ public class ZombieJob implements JobSto
   private TaskAttemptInfo getTaskAttemptInfo(LoggedTask loggedTask,
       LoggedTaskAttempt loggedAttempt) {
     TaskInfo taskInfo = getTaskInfo(loggedTask);
+    
+    List<List<Integer>> allSplitVectors = loggedAttempt.allSplitVectors();
+
     State state = convertState(loggedAttempt.getResult());
     if (loggedTask.getTaskType() == Values.MAP) {
       long taskTime;
@@ -582,7 +586,7 @@ public class ZombieJob implements JobSto
         taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime();
       }
       taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID());
-      return new MapTaskAttemptInfo(state, taskInfo, taskTime);
+      return new MapTaskAttemptInfo(state, taskInfo, taskTime, allSplitVectors);
     } else if (loggedTask.getTaskType() == Values.REDUCE) {
       long startTime = loggedAttempt.getStartTime();
       long mergeDone = loggedAttempt.getSortFinished();
@@ -593,7 +597,8 @@ public class ZombieJob implements JobSto
         // haven't seen reduce task with startTime=0 ever. But if this happens,
         // make up a reduceTime with no shuffle/merge.
         long reduceTime = makeUpReduceRuntime(state);
-        return new ReduceTaskAttemptInfo(state, taskInfo, 0, 0, reduceTime);
+        return new ReduceTaskAttemptInfo
+          (state, taskInfo, 0, 0, reduceTime, allSplitVectors);
       } else {
         if (shuffleDone <= 0) {
           shuffleDone = startTime;
@@ -607,7 +612,7 @@ public class ZombieJob implements JobSto
         reduceTime = sanitizeTaskRuntime(reduceTime, loggedAttempt.getAttemptID());
         
         return new ReduceTaskAttemptInfo(state, taskInfo, shuffleTime,
-            mergeTime, reduceTime);
+            mergeTime, reduceTime, allSplitVectors);
       }
     } else {
       throw new IllegalArgumentException("taskType for "
@@ -684,7 +689,8 @@ public class ZombieJob implements JobSto
       runtime = makeUpMapRuntime(state, locality);
       runtime = sanitizeTaskRuntime(runtime, makeTaskAttemptID(taskType,
           taskNumber, taskAttemptNumber).toString());
-      TaskAttemptInfo tai = new MapTaskAttemptInfo(state, taskInfo, runtime);
+      TaskAttemptInfo tai
+        = new MapTaskAttemptInfo(state, taskInfo, runtime, null);
       return tai;
     } else if (taskType == TaskType.REDUCE) {
       State state = State.SUCCEEDED;
@@ -695,8 +701,8 @@ public class ZombieJob implements JobSto
       // TODO make up state
       // state = makeUpState(taskAttemptNumber, job.getReducerTriesToSucceed());
       reduceTime = makeUpReduceRuntime(state);
-      TaskAttemptInfo tai = new ReduceTaskAttemptInfo(state, taskInfo,
-          shuffleTime, sortTime, reduceTime);
+      TaskAttemptInfo tai = new ReduceTaskAttemptInfo
+        (state, taskInfo, shuffleTime, sortTime, reduceTime, null);
       return tai;
     }