You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by yh...@apache.org on 2009/07/22 16:49:00 UTC

svn commit: r796751 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/server/jobtracker/ src/test/mapred/org/apache/hadoop/mapred/

Author: yhemanth
Date: Wed Jul 22 14:49:00 2009
New Revision: 796751

URL: http://svn.apache.org/viewvc?rev=796751&view=rev
Log:
MAPREDUCE-682. Removes reservations on tasktrackers which are blacklisted. Contributed by Sreekanth Ramakrishnan.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=796751&r1=796750&r2=796751&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jul 22 14:49:00 2009
@@ -250,3 +250,7 @@
 
     MAPREDUCE-735. Fixes a problem in the KeyFieldHelper to do with 
     the end index for some inputs (Amar Kamat via ddas)
+
+    MAPREDUCE-682. Removes reservations on tasktrackers which are
+    blacklisted. (Sreekanth Ramakrishnan via yhemanth)
+

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=796751&r1=796750&r2=796751&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Jul 22 14:49:00 2009
@@ -629,6 +629,13 @@
       } else {
         LOG.info("Blacklisting tracker : " + hostName 
             + " Reason for blacklisting is : " + rfb);
+        Set<TaskTracker> trackers = 
+          hostnameToTaskTracker.get(hostName);
+        synchronized (trackers) {
+          for (TaskTracker tracker : trackers) {
+            tracker.cancelAllReservations();
+          }
+        }
         removeHostCapacity(hostName);
         fi.setBlacklist(rfb, reason);
       }
@@ -3999,7 +4006,7 @@
       }
 
       // Cleanup
-      taskTracker.lost();
+      taskTracker.cancelAllReservations();
 
       // Purge 'marked' tasks, needs to be done  
       // here to prevent hanging references!

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java?rev=796751&r1=796750&r2=796751&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java Wed Jul 22 14:49:00 2009
@@ -183,10 +183,13 @@
   }
   
   /**
-   * Cleanup when the {@link TaskTracker} is declared as 'lost' by the 
-   * JobTracker.
+   * Cleanup when the {@link TaskTracker} is declared as 'lost/blacklisted'
+   * by the JobTracker.
+   * 
+   * The method assumes that the lock on the {@link JobTracker} is obtained
+   * by the caller.
    */
-  public void lost() {
+  public void cancelAllReservations() {
     // Inform jobs which have reserved slots on this tasktracker
     if (jobForFallowMapSlot != null) {
       unreserveSlots(TaskType.MAP, jobForFallowMapSlot);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java?rev=796751&r1=796750&r2=796751&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java Wed Jul 22 14:49:00 2009
@@ -33,6 +33,8 @@
 import org.apache.hadoop.mapred.JobTracker.ReasonForBlackListing;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 public class TestTaskTrackerBlacklisting extends TestCase {
 
@@ -182,14 +184,15 @@
     sendHeartBeat(null, false);
     assertEquals("Tracker 1 still blacklisted after a day", jobTracker
         .getBlacklistedTrackerCount(), 0);
+    //Cleanup the blacklisted trackers.
+    //Tracker is black listed due to failure count, so clock has to be
+    //forwarded by a day.
     clock.jumpADay = false;
   }
 
   public void testNodeHealthBlackListing() throws Exception {
-    TaskTrackerHealthStatus status = new TaskTrackerHealthStatus();
-    status.setNodeHealthy(false);
-    status.setLastReported(System.currentTimeMillis());
-    status.setHealthReport("ERROR");
+    TaskTrackerHealthStatus status = getUnhealthyNodeStatus("ERROR");
+    //Blacklist tracker due to node health failures.
     sendHeartBeat(status, false);
     for (String host : hosts) {
       checkReasonForBlackListing(host, nodeUnHealthyReasonSet);
@@ -197,6 +200,8 @@
     status.setNodeHealthy(true);
     status.setLastReported(System.currentTimeMillis());
     status.setHealthReport("");
+    //white list tracker so the further test cases can be
+    //using trackers.
     sendHeartBeat(status, false);
     assertEquals("Trackers still blacklisted after healthy report", jobTracker
         .getBlacklistedTrackerCount(), 0);
@@ -207,10 +212,8 @@
     assertEquals("Tracker 1 not blacklisted", jobTracker
         .getBlacklistedTrackerCount(), 1);
     checkReasonForBlackListing(hosts[0], exceedsFailuresReasonSet);
-    TaskTrackerHealthStatus status = new TaskTrackerHealthStatus();
-    status.setNodeHealthy(false);
-    status.setLastReported(System.currentTimeMillis());
-    status.setHealthReport("ERROR");
+    TaskTrackerHealthStatus status = getUnhealthyNodeStatus("ERROR");
+    
     sendHeartBeat(status, false);
 
     assertEquals("All trackers not blacklisted", 
@@ -228,21 +231,19 @@
     for (String host : hosts) {
       checkReasonForBlackListing(host, nodeUnHealthyReasonSet);
     }
+    //clear blacklisted trackers due to node health reasons.
     sendHeartBeat(null, false);
     
     assertEquals("All trackers not white listed", 
         jobTracker.getBlacklistedTrackerCount(), 0);
-    
+    //Clear the blacklisted trackers due to failures.
     clock.jumpADay = false;
   }
   
   public void testBlacklistingReasonString() throws Exception {
     String error = "ERROR";
     String error1 = "ERROR1";
-    TaskTrackerHealthStatus status = new TaskTrackerHealthStatus();
-    status.setNodeHealthy(false);
-    status.setLastReported(System.currentTimeMillis());
-    status.setHealthReport(error);
+    TaskTrackerHealthStatus status = getUnhealthyNodeStatus(error);
     sendHeartBeat(status, false);
 
     assertEquals("All trackers not blacklisted", jobTracker
@@ -272,6 +273,58 @@
           jobTracker.getReasonsForBlacklisting(hosts[i]).replace("\n", ""),
           error1);
     }
+    //clear the blacklisted trackers with node health reasons.
+    sendHeartBeat(null, false);
+  }
+
+  private TaskTrackerHealthStatus getUnhealthyNodeStatus(String error) {
+    TaskTrackerHealthStatus status = new TaskTrackerHealthStatus();
+    status.setNodeHealthy(false);
+    status.setLastReported(System.currentTimeMillis());
+    status.setHealthReport(error);
+    return status;
+  }
+  
+  public void testBlackListingWithTrackerReservation() throws Exception {
+    JobConf conf = new JobConf();
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+    TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
+    TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
+    tt1.reserveSlots(TaskType.MAP, job, 1);
+    tt1.reserveSlots(TaskType.REDUCE, job, 1);
+    tt2.reserveSlots(TaskType.MAP, job, 1);
+    tt2.reserveSlots(TaskType.REDUCE, job, 1);
+    assertEquals("Tracker 1 not reserved for the job 1", 2, job
+        .getNumReservedTaskTrackersForMaps());
+    assertEquals("Tracker 1 not reserved for the job 1", 2, job
+        .getNumReservedTaskTrackersForReduces());
+    runBlackListingJob(jobTracker, trackers);
+    assertEquals("Tracker 1 not unreserved for the job 1", 1, job
+        .getNumReservedTaskTrackersForMaps());
+    assertEquals("Tracker 1 not unreserved for the job 1", 1, job
+        .getNumReservedTaskTrackersForReduces());
+    assertEquals("Tracker 1 not blacklisted", jobTracker
+        .getBlacklistedTrackerCount(), 1);
+    checkReasonForBlackListing(hosts[0], exceedsFailuresReasonSet);
+    
+    TaskTrackerHealthStatus status = getUnhealthyNodeStatus("ERROR");
+    sendHeartBeat(status, false);
+    assertEquals("All trackers not blacklisted", jobTracker
+        .getBlacklistedTrackerCount(), 3);
+    
+    checkReasonForBlackListing(hosts[0], unhealthyAndExceedsFailure);
+    checkReasonForBlackListing(hosts[1], nodeUnHealthyReasonSet);
+    checkReasonForBlackListing(hosts[2], nodeUnHealthyReasonSet);
+    
+    assertEquals("Tracker 1 not unreserved for the job 1", 0, job
+        .getNumReservedTaskTrackersForMaps());
+    assertEquals("Tracker 1 not unreserved for the job 1", 0, job
+        .getNumReservedTaskTrackersForReduces());
+    //white list all trackers for health reasons
+    sendHeartBeat(null, false);
+    clock.jumpADay = false;
   }
 
   /**