You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/03/06 07:42:31 UTC

svn commit: r750783 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: yhemanth
Date: Fri Mar  6 06:42:30 2009
New Revision: 750783

URL: http://svn.apache.org/viewvc?rev=750783&view=rev
Log:
HADOOP-5338. Fix jobtracker restart to clear task completion events cached by tasktrackers to avoid missing events. Contributed by Amar Kamat.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=750783&r1=750782&r2=750783&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Mar  6 06:42:30 2009
@@ -946,6 +946,10 @@
     HADOOP-5145. Balancer sometimes runs out of memory after running days or weeks.
     (hairong)
 
+    HADOOP-5338. Fix jobtracker restart to clear task completion events cached by
+    tasktrackers forcing them to fetch all events afresh, thus avoiding missed
+    task completion events on the tasktrackers. (Amar Kamat via yhemanth)
+
 Release 0.19.2 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java?rev=750783&r1=750782&r2=750783&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java Fri Mar  6 06:42:30 2009
@@ -21,8 +21,10 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.HashMap;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -39,7 +41,7 @@
   short responseId;
   int heartbeatInterval;
   TaskTrackerAction[] actions;
-  Map<JobID, Integer> lastKnownIndexMap = null;
+  Set<JobID> recoveredJobs = new HashSet<JobID>();
 
   HeartbeatResponse() {}
   
@@ -57,12 +59,12 @@
     return responseId;
   }
   
-  public void setLastKnownIndices(Map<JobID, Integer> lastKnownIndexMap) {
-    this.lastKnownIndexMap = lastKnownIndexMap; 
+  public void setRecoveredJobs(Set<JobID> ids) {
+    recoveredJobs = ids; 
   }
   
-  public Map<JobID, Integer> getLastKnownIndex() {
-    return lastKnownIndexMap;
+  public Set<JobID> getRecoveredJobs() {
+    return recoveredJobs;
   }
   
   public void setActions(TaskTrackerAction[] actions) {
@@ -101,17 +103,11 @@
         action.write(out);
       }
     }
-    // Write the last map event index for the jobs
-    if (lastKnownIndexMap != null) {
-      out.writeInt(lastKnownIndexMap.size());
-      for (Map.Entry<JobID, Integer> entry : lastKnownIndexMap.entrySet()) {
-        entry.getKey().write(out);
-        out.writeInt(entry.getValue());
-      }
-    } else {
-      out.writeInt(0);
+    // Write the job ids of the jobs that were recovered
+    out.writeInt(recoveredJobs.size());
+    for (JobID id : recoveredJobs) {
+      id.write(out);
     }
-    //ObjectWritable.writeObject(out, actions, actions.getClass(), conf);
   }
   
   public void readFields(DataInput in) throws IOException {
@@ -129,17 +125,12 @@
     } else {
       actions = null;
     }
-    // Read the last map events index of the jobs
+    // Read the job ids of the jobs that were recovered
     int size = in.readInt();
-    if (size != 0) {
-      lastKnownIndexMap = new HashMap<JobID, Integer>(size);
-      for (int i = 0; i < size; ++i) {
-        JobID id = new JobID();
-        id.readFields(in);
-        int count = in.readInt();
-        lastKnownIndexMap.put(id, count);
-      }
+    for (int i = 0; i < size; ++i) {
+      JobID id = new JobID();
+      id.readFields(in);
+      recoveredJobs.add(id);
     }
-    //actions = (TaskTrackerAction[]) ObjectWritable.readObject(in, conf);
   }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=750783&r1=750782&r2=750783&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Fri Mar  6 06:42:30 2009
@@ -60,8 +60,9 @@
    * Version 23: Added parameter 'initialContact' again in heartbeat method
    *            (HADOOP-4869) 
    * Version 24: Changed format of Task and TaskStatus for HADOOP-4759 
+   * Version 25: JobIDs are passed in response to JobTracker restart 
    */
-  public static final long versionID = 24L;
+  public static final long versionID = 25L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=750783&r1=750782&r2=750783&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  6 06:42:30 2009
@@ -848,6 +848,10 @@
       return jobsToRecover.size() != 0;
     }
 
+    Set<JobID> getJobsToRecover() {
+      return jobsToRecover;
+    }
+
     /** Check if the given string represents a job-id or not 
      */
     private boolean isJobNameValid(String str) {
@@ -1127,7 +1131,9 @@
       long recoveryStartTime = System.currentTimeMillis();
 
       // II. Recover each job
-      for (JobID id : jobsToRecover) {
+      idIter = jobsToRecover.iterator();
+      while (idIter.hasNext()) {
+        JobID id = idIter.next();
         JobInProgress pJob = getJob(id);
 
         // 1. Get the required info
@@ -1169,11 +1175,9 @@
                    + id + ". Ignoring it.", ioe);
         }
 
-        // 6. Inform the jobtracker as to how much of the data is recovered.
-        // This is done so that TT should rollback to account for lost
-        // updates
-        lastSeenEventMapOnRestart.put(pJob.getStatus().getJobID(), 
-                                      pJob.getNumTaskCompletionEvents());
+        if (pJob.isComplete()) {
+          idIter.remove(); // no need to keep this job info as its successful
+        }
       }
 
       recoveryDuration = System.currentTimeMillis() - recoveryStartTime;
@@ -1198,8 +1202,6 @@
         trackerExpiryQueue.add(status);
       }
 
-      // IV. Cleanup
-      jobsToRecover.clear();
       LOG.info("Restoration complete");
     }
     
@@ -1277,10 +1279,6 @@
   Map<String, Node> hostnameToNodeMap = 
     Collections.synchronizedMap(new TreeMap<String, Node>());
   
-  // A map from JobID to the last known task-completion-event-index on restart
-  Map<JobID, Integer> lastSeenEventMapOnRestart = 
-    new HashMap<JobID, Integer>();
-  
   // Number of resolved entries
   int numResolved;
     
@@ -2375,7 +2373,7 @@
     
     // check if the restart info is req
     if (addRestartInfo) {
-      response.setLastKnownIndices(lastSeenEventMapOnRestart);
+      response.setRecoveredJobs(recoveryManager.getJobsToRecover());
     }
         
     // Update the trackerToHeartbeatResponseMap

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=750783&r1=750782&r2=750783&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  6 06:42:30 2009
@@ -725,17 +725,14 @@
     }
       
     /**
-     * Check if the number of events that are obtained are more than required.
-     * If yes then purge the extra ones.
+     * Reset the events obtained so far.
      */
-    public void purgeMapEvents(int lastKnownIndex) {
+    public void reset() {
       // Note that the sync is first on fromEventId and then on allMapEvents
       synchronized (fromEventId) {
         synchronized (allMapEvents) {
-          if (allMapEvents.size() > lastKnownIndex) {
-            fromEventId.set(lastKnownIndex);
-            allMapEvents = allMapEvents.subList(0, lastKnownIndex);
-          }
+          fromEventId.set(0); // set the new index for TCE
+          allMapEvents.clear();
         }
       }
     }
@@ -1094,19 +1091,19 @@
         
         
         // Check if the map-event list needs purging
-        if (heartbeatResponse.getLastKnownIndex() != null) {
+        Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();
+        if (jobs.size() > 0) {
           synchronized (this) {
             // purge the local map events list
-            for (Map.Entry<JobID, Integer> entry 
-                 : heartbeatResponse.getLastKnownIndex().entrySet()) {
+            for (JobID job : jobs) {
               RunningJob rjob;
               synchronized (runningJobs) {
-                rjob = runningJobs.get(entry.getKey());          
+                rjob = runningJobs.get(job);          
                 if (rjob != null) {
                   synchronized (rjob) {
                     FetchStatus f = rjob.getFetchStatus();
                     if (f != null) {
-                      f.purgeMapEvents(entry.getValue());
+                      f.reset();
                     }
                   }
                 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=750783&r1=750782&r2=750783&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java Fri Mar  6 06:42:30 2009
@@ -25,6 +25,8 @@
 
 import junit.framework.TestCase;
 import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
 
 /** 
  * TestJobTrackerRestart checks if the jobtracker can restart. JobTracker 
@@ -255,18 +257,6 @@
     // Test if all the events that were recovered match exactly
     testTaskCompletionEvents(prevEvents, jtEvents, false, numToMatch);
     
-    TaskCompletionEvent[] trackerEvents;
-    while(true) {
-      trackerEvents = 
-        mr.getMapTaskCompletionEventsUpdates(0, id, 2 * numMaps)
-          .getMapTaskCompletionEvents();
-      if (trackerEvents.length < jtEvents.length) {
-        UtilsForTests.waitFor(100);
-      } else {
-        break;
-      }
-    }
-    
     // Check the task reports
     // The reports should match exactly if the attempts are same
     TaskReport[] afterMapReports = jobClient.getMapTaskReports(id);
@@ -278,13 +268,36 @@
     assertEquals("Job priority change is not reflected", 
                  JobPriority.HIGH, mr.getJobPriority(id));
     
+    List<TaskCompletionEvent> jtMapEvents = 
+      new ArrayList<TaskCompletionEvent>();
+    for (TaskCompletionEvent tce : jtEvents) {
+      if (tce.isMapTask()) {
+        jtMapEvents.add(tce);
+      }
+    }
+
+    TaskCompletionEvent[] trackerEvents;
+    while(true) {
+      // wait for the tracker to pull all the map events
+      trackerEvents =
+        mr.getMapTaskCompletionEventsUpdates(0, id, jtMapEvents.size())
+        .getMapTaskCompletionEvents();
+      if (trackerEvents.length < jtMapEvents.size()) {
+        UtilsForTests.waitFor(1000);
+      } else {
+        break;
+      }
+    }
+
+    testTaskCompletionEvents(jtMapEvents.toArray(new TaskCompletionEvent[0]), 
+                             trackerEvents, true, -1);
+    
     //  Signal the reduce tasks
     UtilsForTests.signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir), 
                               getReduceSignalFile(shareDir));
     
     UtilsForTests.waitTillDone(jobClient);
     
-    testTaskCompletionEvents(jtEvents, trackerEvents, true, 2 * numMaps);
     
     // validate the history file
     TestJobHistory.validateJobHistoryFileFormat(id, newConf, "SUCCESS", true);