You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/08/09 00:24:02 UTC

svn commit: r429858 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/

Author: cutting
Date: Tue Aug  8 15:24:01 2006
New Revision: 429858

URL: http://svn.apache.org/viewvc?rev=429858&view=rev
Log:
HADOOP-427.  New bug number for this patch.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=429858&r1=429857&r2=429858&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Aug  8 15:24:01 2006
@@ -3,7 +3,7 @@
 
 Trunk (unreleased changes)
 
- 1. HADOOP-415.  Replace some uses of DatanodeDescriptor in the DFS
+ 1. HADOOP-427.  Replace some uses of DatanodeDescriptor in the DFS
     web UI code with DatanodeInfo, the preferred public class.
     (Devaraj Das via cutting)
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=429858&r1=429857&r2=429858&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Aug  8 15:24:01 2006
@@ -280,7 +280,7 @@
           } else if (status.getRunState() == TaskStatus.FAILED) {
             // Tell the job to fail the relevant task
             failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),
-                       wasRunning, wasComplete);
+                       wasRunning, wasComplete, metrics);
           }          
         }
 
@@ -520,7 +520,7 @@
                 }
             }
         }
-
+        
         //
         // If all tasks are complete, then the job is done!
         //
@@ -571,7 +571,8 @@
      */
     private void failedTask(TaskInProgress tip, String taskid, 
                             TaskStatus status, String trackerName,
-                            boolean wasRunning, boolean wasComplete) {
+                            boolean wasRunning, boolean wasComplete,
+                            JobTrackerMetrics metrics) {
         tip.failedSubTask(taskid, trackerName);
         boolean isRunning = tip.isRunning();
         boolean isComplete = tip.isComplete();
@@ -596,8 +597,10 @@
         // the failed task goes to the end of the list.
         if (tip.isMapTask()) {
           firstMapToTry = (tip.getIdWithinJob() + 1) % maps.length;
+          metrics.failedMap();
         } else {
           firstReduceToTry = (tip.getIdWithinJob() + 1) % reduces.length;
+          metrics.failedReduce();
         }
             
         //
@@ -695,4 +698,5 @@
        }
        return null;
     }
+    
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=429858&r1=429857&r2=429858&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Aug  8 15:24:01 2006
@@ -219,6 +219,20 @@
                             // tracker has already been destroyed.
                             if (newProfile != null) {
                                 if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
+                                    // But save the state so that if at a later
+                                    // point of time, we happen to hear from the
+                                    // same TaskTracker, we can reinstate 
+                                    // the state
+                                    ExpiredTaskTrackerState 
+                                        expTaskTrackerState = 
+                                        new ExpiredTaskTrackerState(
+                                           leastRecent.getTrackerName());
+                                    if (LOG.isDebugEnabled())
+                                      LOG.debug("Saving state of TaskTracker " +
+                                             leastRecent.getTrackerName());
+                                    expiredTaskTrackerStates.put(
+                                            leastRecent.getTrackerName(), 
+                                            expTaskTrackerState);
                                     // Remove completely
                                     updateTaskTrackerStatus(trackerName, null);
                                     lostTaskTracker(leastRecent.getTrackerName(),
@@ -347,6 +361,11 @@
         Metrics.report(metricsRecord, "maps-completed",
             ++numMapTasksCompleted);
       }
+
+      synchronized void failedMap() {
+        Metrics.report(metricsRecord, "maps-completed",
+            --numMapTasksCompleted);
+      }
       
       synchronized void launchReduce() {
         Metrics.report(metricsRecord, "reduces-launched",
@@ -357,6 +376,11 @@
         Metrics.report(metricsRecord, "reduces-completed",
             ++numReduceTasksCompleted);
       }
+
+      synchronized void failedReduce() {
+        Metrics.report(metricsRecord, "reduces-completed",
+            --numReduceTasksCompleted);
+      }
       
       synchronized void submitJob() {
         Metrics.report(metricsRecord, "jobs-submitted",
@@ -427,6 +451,7 @@
     Thread initJobsThread = null;
     ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
     Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks);
+    private TreeMap expiredTaskTrackerStates = new TreeMap();
     
     /**
      * It might seem like a bug to maintain a TreeSet of status objects,
@@ -599,6 +624,36 @@
         LOG.info("stopped all jobtracker services");
         return;
     }
+
+    boolean reinstateStateOfTaskTracker(String trackerName) {
+      if (LOG.isDebugEnabled())
+        LOG.debug("Going to reinstate state of tasktracker " + trackerName);
+      ExpiredTaskTrackerState e = (ExpiredTaskTrackerState)
+                                   expiredTaskTrackerStates.get(trackerName);
+      if (e == null) return false;
+      Set taskset = e.getTaskSet();
+      if (taskset == null) return true;
+      for (Iterator it = taskset.iterator(); it.hasNext(); ) {
+        String taskId = (String) it.next();
+        TaskInProgress tip = e.getTIP(taskId);
+        if (LOG.isDebugEnabled())
+          LOG.debug("Going to recreate task entry for task " + taskId);
+        //check whether the job is still running
+        if (tip != null && 
+                tip.getJob().getStatus().getRunState() == JobStatus.RUNNING)
+          createTaskEntry(taskId, trackerName, tip);
+      }
+      ArrayList completedTasks = e.getCompletedTasks();
+      for (int i = 0; i < completedTasks.size(); i++) {
+        TaskStatus ts = (TaskStatus)completedTasks.get(i);
+        TaskInProgress tip = (TaskInProgress)taskidToTIPMap.get(ts.getTaskId());
+        if (tip == null) continue;
+        JobInProgress j = tip.getJob();
+        if (j != null && j.getStatus().getRunState() == JobStatus.RUNNING)
+          j.updateTaskStatus(tip, ts, myMetrics); 
+      }
+      return true;
+    }
     
     ///////////////////////////////////////////////////////
     // Maintain lookup tables; called by JobInProgress
@@ -748,7 +803,11 @@
                 } else {
                     // If not first contact, there should be some record of the tracker
                     if (!seenBefore) {
-                        return InterTrackerProtocol.UNKNOWN_TASKTRACKER;
+                        if (!reinstateStateOfTaskTracker(trackerName))
+                          return InterTrackerProtocol.UNKNOWN_TASKTRACKER;
+                        else 
+                          trackerExpiryQueue.add(trackerStatus);
+                          
                     }
                 }
 
@@ -1196,5 +1255,68 @@
 
         Configuration conf=new Configuration();
         startTracker(conf);
+    }
+
+    private class ExpiredTaskTrackerState {
+      //Map from taskId (assigned to a given tasktracker) to the taskId's TIP
+      private TreeMap trackerTaskIdToTIPMap = new TreeMap();
+      //completedTasks is an array list that contains the list of tasks that a
+      //tasktracker successfully completed
+      ArrayList completedTasks = new ArrayList();
+
+      public ExpiredTaskTrackerState(String trackerId) {
+        trackerTaskIdToTIPMap.clear();
+        completedTasks.clear();
+        TreeSet tasks = (TreeSet) trackerToTaskMap.get(trackerId);
+        if (tasks == null) {
+          if (LOG.isDebugEnabled())
+            LOG.debug("This tasktracker has no tasks");
+          return;
+        }
+        if (LOG.isDebugEnabled())
+          LOG.debug("Task IDs that this tasktracker has: ");
+        //We save the status of completed tasks only since TaskTrackers don't
+        //send updates about completed tasks. We don't need to save the status
+        //of other tasks since the TaskTracker will send the update along
+        //with the heartbeat (whenever that happens).
+        //Saving the status of completed tasks is required since the JobTracker
+        //will mark all tasks that belonged to a given TaskTracker as failed
+        //if that TaskTracker is lost. Now, if that same TaskTracker reports 
+        //in later on, we can simply re-mark the completed tasks (TIPs really) 
+        //it reported earlier about as "completed" and avoid unnecessary 
+        //re-run of those tasks.
+        for (Iterator it = tasks.iterator(); it.hasNext(); ) {
+          String taskId = (String) it.next();
+          if (LOG.isDebugEnabled())
+            LOG.debug(taskId);
+          TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
+          if (tip !=null && 
+                  tip.getJob().getStatus().getRunState() == JobStatus.RUNNING)
+            trackerTaskIdToTIPMap.put(taskId, tip);
+          else continue;
+          TaskStatus ts = tip.getTaskStatus(taskId);
+          //ts could be null for a recently assigned task, in the case where,
+          //the tasktracker hasn't yet reported status about that task
+          if (ts == null) continue;
+          if (tip.isComplete()) {
+            TaskStatus saveTS = null;
+            try {
+              saveTS = (TaskStatus)ts.clone();
+            } catch (Exception e) {
+              LOG.fatal("Could not save TaskTracker state",e);
+            }
+            completedTasks.add(saveTS);
+          }
+        } 
+      }
+      public Set getTaskSet() {
+        return trackerTaskIdToTIPMap.keySet();
+      }
+      public TaskInProgress getTIP(String taskId) {
+        return (TaskInProgress)trackerTaskIdToTIPMap.get(taskId);
+      }
+      public ArrayList getCompletedTasks() {
+        return completedTasks;
+      }
     }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=429858&r1=429857&r2=429858&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Aug  8 15:24:01 2006
@@ -283,6 +283,12 @@
         
         taskStatuses.put(taskid, status);
 
+        //since if this task was declared failed due to tasktracker getting 
+        //lost, but now that same tasktracker reports in with this taskId as 
+        //running, we update recentTasks
+        if (status.getRunState() == TaskStatus.RUNNING)
+          recentTasks.add(taskid);
+
         // Recompute progress
         recomputeProgress();
         return changed;
@@ -469,5 +475,12 @@
      */
     public int getIdWithinJob() {
       return partition;
+    }
+
+    /**
+     * Get the TaskStatus associated with a given taskId
+     */
+    public TaskStatus getTaskStatus(String taskId) {
+      return (TaskStatus)taskStatuses.get(taskId);
     }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=429858&r1=429857&r2=429858&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Tue Aug  8 15:24:01 2006
@@ -25,7 +25,7 @@
  *
  * @author Mike Cafarella
  **************************************************/
-class TaskStatus implements Writable {
+class TaskStatus implements Writable, Cloneable {
     public static final int RUNNING = 0;
     public static final int SUCCEEDED = 1;
     public static final int FAILED = 2;
@@ -51,6 +51,16 @@
         this.diagnosticInfo = diagnosticInfo;
         this.stateString = stateString;
         this.taskTracker = taskTracker;
+    }
+    
+    //Implementing the clone method so that we can save the status of tasks
+    public Object clone() throws CloneNotSupportedException {
+        TaskStatus ts = (TaskStatus)super.clone();
+        if (this.diagnosticInfo != null)
+          ts.diagnosticInfo = new String(this.diagnosticInfo);
+        if (this.stateString != null)
+          ts.stateString = new String(this.stateString);
+        return ts;
     }
     
     public String getTaskId() { return taskid; }