You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/12/05 12:04:23 UTC

svn commit: r723710 - in /hadoop/core/trunk: ./ conf/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/webapps/job/

Author: ddas
Date: Fri Dec  5 03:04:17 2008
New Revision: 723710

URL: http://svn.apache.org/viewvc?rev=723710&view=rev
Log:
HADOOP-4305. Improves the blacklisting strategy, whereby, tasktrackers that are blacklisted are not given tasks to run from other jobs, subject to the following conditions (all must be met): 1) The TaskTracker has been blacklisted by at least 4 jobs (configurable) 2) The TaskTracker has been blacklisted 50% more number of times than the average (configurable) 3) The cluster has less than 50% trackers blacklisted.  Once in 24 hours, a TaskTracker blacklisted for all jobs is given a chance. Restarting the TaskTracker moves it out of the blacklist. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/core/trunk/src/webapps/job/jobtracker.jsp
    hadoop/core/trunk/src/webapps/job/machines.jsp

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Dec  5 03:04:17 2008
@@ -176,6 +176,17 @@
     HADOOP-4747. Speed up FsShell::ls by removing redundant calls to the
     filesystem. (David Phillips via cdouglas)
 
+    HADOOP-4305. Improves the blacklisting strategy, whereby, tasktrackers
+    that are blacklisted are not given tasks to run from other jobs, subject
+    to the following conditions (all must be met):
+    1) The TaskTracker has been blacklisted by at least 4 jobs (configurable)
+    2) The TaskTracker has been blacklisted 50% more number of times than
+       the average (configurable)
+    3) The cluster has less than 50% trackers blacklisted
+    Once in 24 hours, a TaskTracker blacklisted for all jobs is given a chance.
+    Restarting the TaskTracker moves it out of the blacklist.
+    (Amareshwari Sriramadasu via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Fri Dec  5 03:04:17 2008
@@ -1296,6 +1296,17 @@
 </property> 
 
 <property>
+  <name>mapred.max.tracker.blacklists</name>
+  <value>4</value>
+  <description>The number of blacklists for a taskTracker by various jobs 
+               after which the task tracker could be blacklisted across
+               all jobs. The tracker will be given a tasks later 
+               (after a day). The tracker will become a healthy 
+               tracker after a restart. 
+  </description>
+</property>
+
+<property>
   <name>mapred.max.tracker.failures</name>
   <value>4</value>
   <description>The number of task-failures on a tasktracker of a given job 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java Fri Dec  5 03:04:17 2008
@@ -52,6 +52,7 @@
 public class ClusterStatus implements Writable {
 
   private int task_trackers;
+  private int blacklisted_trackers;
   private int map_tasks;
   private int reduce_tasks;
   private int max_map_tasks;
@@ -73,7 +74,24 @@
    */
   ClusterStatus(int trackers, int maps, int reduces, int maxMaps,
                 int maxReduces, JobTracker.State state) {
+    this(trackers, 0, maps, reduces, maxMaps, maxReduces, state);
+  }
+  
+  /**
+   * Construct a new cluster status.
+   * 
+   * @param trackers no. of tasktrackers in the cluster
+   * @param blacklists no of blacklisted task trackers in the cluster
+   * @param maps no. of currently running map-tasks in the cluster
+   * @param reduces no. of currently running reduce-tasks in the cluster
+   * @param maxMaps the maximum no. of map tasks in the cluster
+   * @param maxReduces the maximum no. of reduce tasks in the cluster
+   * @param state the {@link JobTracker.State} of the <code>JobTracker</code>
+   */
+  ClusterStatus(int trackers, int blacklists, int maps, int reduces,
+                int maxMaps, int maxReduces, JobTracker.State state) {
     task_trackers = trackers;
+    blacklisted_trackers = blacklists;
     map_tasks = maps;
     reduce_tasks = reduces;
     max_map_tasks = maxMaps;
@@ -82,7 +100,6 @@
     used_memory = Runtime.getRuntime().totalMemory();
     max_memory = Runtime.getRuntime().maxMemory();
   }
-  
 
   /**
    * Get the number of task trackers in the cluster.
@@ -94,6 +111,15 @@
   }
   
   /**
+   * Get the number of blacklisted task trackers in the cluster.
+   * 
+   * @return the number of blacklisted task trackers in the cluster.
+   */
+  public int getBlacklistedTrackers() {
+    return blacklisted_trackers;
+  }
+  
+  /**
    * Get the number of currently running map tasks in the cluster.
    * 
    * @return the number of currently running map tasks in the cluster.
@@ -159,6 +185,7 @@
 
   public void write(DataOutput out) throws IOException {
     out.writeInt(task_trackers);
+    out.writeInt(blacklisted_trackers);
     out.writeInt(map_tasks);
     out.writeInt(reduce_tasks);
     out.writeInt(max_map_tasks);
@@ -170,6 +197,7 @@
 
   public void readFields(DataInput in) throws IOException {
     task_trackers = in.readInt();
+    blacklisted_trackers = in.readInt();
     map_tasks = in.readInt();
     reduce_tasks = in.readInt();
     max_map_tasks = in.readInt();

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Fri Dec  5 03:04:17 2008
@@ -55,8 +55,10 @@
    * Version 21: Changed information reported in TaskTrackerStatus'
    *             ResourceStatus and the corresponding accessor methods
    *             (HADOOP-4035)
+   * Version 22: Replaced parameter 'initialContact' with 'restarted' 
+   *             in heartbeat method (HADOOP-4305) 
    */
-  public static final long versionID = 21L;
+  public static final long versionID = 22L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;
@@ -73,8 +75,8 @@
    * it recieved from the {@link JobTracker} 
    * 
    * @param status the status update
-   * @param initialContact <code>true</code> if this is first interaction since
-   *                       'refresh', <code>false</code> otherwise.
+   * @param restarted <code>true</code> if the process has just started or 
+   *                   restarted, <code>false</code> otherwise
    * @param acceptNewTasks <code>true</code> if the {@link TaskTracker} is
    *                       ready to accept new tasks to run.                 
    * @param responseId the last responseId successfully acted upon by the
@@ -83,7 +85,9 @@
    *         fresh instructions.
    */
   HeartbeatResponse heartbeat(TaskTrackerStatus status, 
-                              boolean initialContact, boolean acceptNewTasks, short responseId)
+                              boolean restarted, 
+                              boolean acceptNewTasks,
+                              short responseId)
     throws IOException;
   
   /**

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Fri Dec  5 03:04:17 2008
@@ -1701,8 +1701,8 @@
     }
 
     String taskTrackerName =
-        JobInProgress.convertTrackerNameToHostName(
-            attempt.get(Keys.TRACKER_NAME)).substring("tracker_".length());
+      JobInProgress.convertTrackerNameToHostName(
+        attempt.get(Keys.TRACKER_NAME));
     return TaskLogServlet.getTaskLogUrl(taskTrackerName, attempt
         .get(Keys.HTTP_PORT), attempt.get(Keys.TASK_ATTEMPT_ID));
   }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Dec  5 03:04:17 2008
@@ -1203,7 +1203,7 @@
     String trackerHostName = (indexOfColon == -1) ? 
       trackerName : 
       trackerName.substring(0, indexOfColon);
-    return trackerHostName;
+    return trackerHostName.substring("tracker_".length());
   }
     
   /**
@@ -1238,6 +1238,21 @@
   }
     
   /**
+   * Get the black listed trackers for the job
+   * 
+   * @return List of blacklisted tracker names
+   */
+  List<String> getBlackListedTrackers() {
+    List<String> blackListedTrackers = new ArrayList<String>();
+    for (Map.Entry<String,Integer> e : trackerToFailuresMap.entrySet()) {
+       if (e.getValue().intValue() >= conf.getMaxTaskFailuresPerTracker()) {
+         blackListedTrackers.add(e.getKey());
+       }
+    }
+    return blackListedTrackers;
+  }
+  
+  /**
    * Get the no. of 'flaky' tasktrackers for a given job.
    * 
    * @return the no. of 'flaky' tasktrackers for a given job.

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java Fri Dec  5 03:04:17 2008
@@ -51,8 +51,10 @@
    *             setupProgress to JobStatus as part of HADOOP-4261           
    * Version 17: getClusterStatus returns the amount of memory used by 
    *             the server. HADOOP-4435
+   * Version 18: Added blacklisted trackers to the ClusterStatus 
+   *             for HADOOP-4305            
    */
-  public static final long versionID = 17L;
+  public static final long versionID = 18L;
 
   /**
    * Allocate a name for the job.

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Dec  5 03:04:17 2008
@@ -83,6 +83,19 @@
   static long TASKTRACKER_EXPIRY_INTERVAL = 10 * 60 * 1000;
   static long RETIRE_JOB_INTERVAL;
   static long RETIRE_JOB_CHECK_INTERVAL;
+  // The interval after which one fault of a tracker will be discarded,
+  // if there are no faults during this. 
+  private static long UPDATE_FAULTY_TRACKER_INTERVAL = 24 * 60 * 60 * 1000;
+  // The maximum percentage of trackers in cluster added 
+  // to the 'blacklist' across all the jobs.
+  private static double MAX_BLACKLIST_PERCENT = 0.50;
+  // A tracker is blacklisted across jobs only if number of 
+  // blacklists are X% above the average number of blacklists.
+  // X is the blacklist threshold here.
+  private double AVERAGE_BLACKLIST_THRESHOLD = 0.50;
+  // The maximum number of blacklists for a tracker after which the 
+  // tracker could be blacklisted across all jobs
+  private int MAX_BLACKLISTS_PER_TRACKER = 4;
   public static enum State { INITIALIZING, RUNNING }
   State state = State.INITIALIZING;
   private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
@@ -313,6 +326,11 @@
                     if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
                       // Remove completely after marking the tasks as 'KILLED'
                       lostTaskTracker(leastRecent.getTrackerName());
+                      // tracker is lost, and if it is blacklisted, remove 
+                      // it from the count of blacklisted trackers in the cluster
+                      if (isBlacklisted(trackerName)) {
+                        faultyTrackers.numBlacklistedTrackers -= 1;
+                      }
                       updateTaskTrackerStatus(trackerName, null);
                     } else {
                       // Update time by inserting latest profile
@@ -402,8 +420,236 @@
       }
     }
   }
+  
+  // The FaultInfo which indicates the number of faults of a tracker
+  // and when the last fault occurred
+  // and whether the tracker is blacklisted across all jobs or not
+  private static class FaultInfo {
+    int numFaults = 0;
+    long lastUpdated;
+    boolean blacklisted; 
 
- 
+    FaultInfo() {
+      numFaults = 0;
+      lastUpdated = System.currentTimeMillis();
+      blacklisted = false;
+    }
+
+    void setFaultCount(int num) {
+      numFaults = num;
+    }
+
+    void setLastUpdated(long timeStamp) {
+      lastUpdated = timeStamp;
+    }
+
+    int getFaultCount() {
+      return numFaults;
+    }
+
+    long getLastUpdated() {
+      return lastUpdated;
+    }
+    
+    boolean isBlacklisted() {
+      return blacklisted;
+    }
+    
+    void setBlacklist(boolean blacklist) {
+      blacklisted = blacklist;
+    }
+  }
+
+  private class FaultyTrackersInfo {
+    // A map from hostName to its faults
+    private Map<String, FaultInfo> potentiallyFaultyTrackers = 
+              new HashMap<String, FaultInfo>();
+    // This count gives the number of blacklisted trackers in the cluster 
+    // at any time. This is maintained to avoid iteration over 
+    // the potentiallyFaultyTrackers to get blacklisted trackers. And also
+    // this count doesn't include blacklisted trackers which are lost, 
+    // although the fault info is maintained for lost trackers.  
+    private volatile int numBlacklistedTrackers = 0;
+
+    /**
+     * Increments faults(blacklist by job) for the tracker by one.
+     * 
+     * Adds the tracker to the potentially faulty list. 
+     * 
+     * @param hostName 
+     */
+    void incrementFaults(String hostName) {
+      synchronized (potentiallyFaultyTrackers) {
+        FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
+        if (fi == null) {
+          fi = new FaultInfo();
+          potentiallyFaultyTrackers.put(hostName, fi);
+        }
+        int numFaults = fi.getFaultCount();
+        ++numFaults;
+        fi.setFaultCount(numFaults);
+        fi.setLastUpdated(System.currentTimeMillis());
+        if (!fi.isBlacklisted()) {
+          if (shouldBlacklist(hostName, numFaults)) {
+            LOG.info("Adding " + hostName + " to the blacklist" +
+                     " across all jobs");
+            removeHostCapacity(hostName);
+            fi.setBlacklist(true);
+          }
+        }
+      }        
+    }
+
+    /**
+     * Blacklists the tracker across all jobs if
+     * <ol>
+     * <li>#faults are more than 
+     *     MAX_BLACKLISTS_PER_TRACKER (configurable) blacklists</li>
+     * <li>#faults is 50% (configurable) above the average #faults</li>
+     * <li>50% the cluster is not blacklisted yet </li>
+     */
+    private boolean shouldBlacklist(String hostName, int numFaults) {
+      if (numFaults >= MAX_BLACKLISTS_PER_TRACKER) {
+        // calculate avgBlackLists
+        long clusterSize = getClusterStatus().getTaskTrackers();
+        long sum = 0;
+        for (FaultInfo f : potentiallyFaultyTrackers.values()) {
+          sum += f.getFaultCount();
+        }
+        double avg = (double) sum / clusterSize;
+            
+        long totalCluster = clusterSize + numBlacklistedTrackers;
+        if ((numFaults - avg) > (AVERAGE_BLACKLIST_THRESHOLD * avg) &&
+            numBlacklistedTrackers < (totalCluster * MAX_BLACKLIST_PERCENT)) {
+          return true;
+        }
+      }
+      return false;
+    }
+    
+    /**
+     * Removes the tracker from blacklist and
+     * from potentially faulty list, when it is restarted.
+     * 
+     * @param hostName
+     */
+    void markTrackerHealthy(String hostName) {
+      synchronized (potentiallyFaultyTrackers) {
+        FaultInfo fi = potentiallyFaultyTrackers.remove(hostName);
+        if (fi != null && fi.isBlacklisted()) {
+          LOG.info("Removing " + hostName + " from blacklist");
+          addHostCapacity(hostName);
+        }
+      }
+    }
+
+    /**
+     * Check whether tasks can be assigned to the tracker.
+     *
+     * One fault of the tracker is discarded if there
+     * are no faults during one day. So, the tracker will get a 
+     * chance again to run tasks of a job.
+     * 
+     * @param hostName The tracker name
+     * @return true if the tracker is blacklisted 
+     *         false otherwise
+     */
+    boolean shouldAssignTasksToTracker(String hostName) {
+      synchronized (potentiallyFaultyTrackers) {
+        FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
+        long now = System.currentTimeMillis(); 
+        if (fi != null &&
+            (now - fi.getLastUpdated()) > UPDATE_FAULTY_TRACKER_INTERVAL) {
+          int numFaults = fi.getFaultCount() - 1;
+          if (fi.isBlacklisted()) {
+            LOG.info("Removing " + hostName + " from blacklist");
+            addHostCapacity(hostName);
+            fi.setBlacklist(false);
+          }
+          if (numFaults > 0) {
+            fi.setFaultCount(numFaults);
+            fi.setLastUpdated(now);
+          } else {
+            potentiallyFaultyTrackers.remove(hostName);
+          }
+        }
+        return (fi != null && fi.isBlacklisted());
+      }
+    }
+
+    private void removeHostCapacity(String hostName) {
+      synchronized (taskTrackers) {
+        // remove the capacity of trackers on this host
+        for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
+          totalMapTaskCapacity -= status.getMaxMapTasks();
+          totalReduceTaskCapacity -= status.getMaxReduceTasks();
+        }
+        numBlacklistedTrackers +=
+          uniqueHostsMap.remove(hostName);
+      }
+    }
+    
+    // This is called on tracker's restart or after a day of blacklist.
+    private void addHostCapacity(String hostName) {
+      synchronized (taskTrackers) {
+        int numTrackersOnHost = 0;
+        // add the capacity of trackers on the host
+        for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
+          totalMapTaskCapacity += status.getMaxMapTasks();
+          totalReduceTaskCapacity += status.getMaxReduceTasks();
+          numTrackersOnHost++;
+        }
+        uniqueHostsMap.put(hostName,
+                           numTrackersOnHost);
+        numBlacklistedTrackers -= numTrackersOnHost;
+      }
+    }
+
+    /**
+     * Whether a host is blacklisted across all the jobs. 
+     * 
+     * @param hostName
+     * @return
+     */
+    boolean isBlacklisted(String hostName) {
+      synchronized (potentiallyFaultyTrackers) {
+        FaultInfo fi = null;
+        if ((fi = potentiallyFaultyTrackers.get(hostName)) != null) {
+          return fi.isBlacklisted();
+        }
+      }
+      return false;
+    }
+    
+    int getFaultCount(String hostName) {
+      synchronized (potentiallyFaultyTrackers) {
+        FaultInfo fi = null;
+        if ((fi = potentiallyFaultyTrackers.get(hostName)) != null) {
+          return fi.getFaultCount();
+        }
+      }
+      return 0;
+    }
+  }
+  
+  /**
+   * Get all task tracker statuses on given host
+   * 
+   * @param hostName
+   * @return {@link java.util.List} of {@link TaskTrackerStatus}
+   */
+  private List<TaskTrackerStatus> getStatusesOnHost(String hostName) {
+    List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
+    synchronized (taskTrackers) {
+      for (TaskTrackerStatus status : taskTrackers.values()) {
+        if (hostName.equals(status.getHost())) {
+          statuses.add(status);
+        }
+      }
+    }
+    return statuses;
+  }
+  
   ///////////////////////////////////////////////////////
   // Used to recover the jobs upon restart
   ///////////////////////////////////////////////////////
@@ -675,9 +921,6 @@
       String trackerName = attempt.get(Keys.TRACKER_NAME);
       String trackerHostName = 
         JobInProgress.convertTrackerNameToHostName(trackerName);
-      int index = trackerHostName.indexOf("_");
-      trackerHostName = 
-        trackerHostName.substring(index + 1, trackerHostName.length());
       int port = attempt.getInt(Keys.HTTP_PORT);
       
       long attemptStartTime = attempt.getLong(Keys.START_TIME);
@@ -991,6 +1234,8 @@
   // Number of resolved entries
   int numResolved;
     
+  private FaultyTrackersInfo faultyTrackers = new FaultyTrackersInfo();
+  
   //
   // Watch and expire TaskTracker objects using these structures.
   // We can map from Name->TaskTrackerStatus, or we can expire by time.
@@ -1063,6 +1308,14 @@
     RETIRE_JOB_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.interval", 24 * 60 * 60 * 1000);
     RETIRE_JOB_CHECK_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.check", 60 * 1000);
     MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum", 100);
+    MAX_BLACKLISTS_PER_TRACKER = 
+        conf.getInt("mapred.max.tracker.blacklists", 4);
+
+    //This configuration is there solely for tuning purposes and 
+    //once this feature has been tested in real clusters and an appropriate
+    //value for the threshold has been found, this config might be taken out.
+    AVERAGE_BLACKLIST_THRESHOLD = 
+      conf.getFloat("mapred.cluster.average.blacklist.threshold", 0.5f); 
 
     // This is a directory of temporary submission files.  We delete it
     // on startup, and can delete any files that we're done with
@@ -1538,6 +1791,15 @@
 
     long now = System.currentTimeMillis();
     
+    // add the blacklisted trackers to potentially faulty list
+    if (job.getStatus().getRunState() == JobStatus.SUCCEEDED) {
+      if (job.getNoOfBlackListedTrackers() > 0) {
+        for (String hostName : job.getBlackListedTrackers()) {
+          faultyTrackers.incrementFaults(hostName);
+        }
+      }
+    }
+    
     // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user
     // in memory; information about the purged jobs is available via
     // JobHistory.
@@ -1681,11 +1943,82 @@
     }
     return v;
   }
+
+  /**
+   * Get all the task trackers in the cluster
+   * 
+   * @return {@link Collection} of {@link TaskTrackerStatus} 
+   */
   public Collection<TaskTrackerStatus> taskTrackers() {
     synchronized (taskTrackers) {
       return taskTrackers.values();
     }
   }
+  
+  /**
+   * Get the active task tracker statuses in the cluster
+   *  
+   * @return {@link Collection} of active {@link TaskTrackerStatus} 
+   */
+  public Collection<TaskTrackerStatus> activeTaskTrackers() {
+    Collection<TaskTrackerStatus> activeTrackers = 
+      new ArrayList<TaskTrackerStatus>();
+    synchronized (taskTrackers) {
+      for (TaskTrackerStatus status : taskTrackers.values()) {
+        if (!faultyTrackers.isBlacklisted(status.getHost())) {
+          activeTrackers.add(status);
+        }
+      }
+    }
+    return activeTrackers;
+  }
+  
+  /**
+   * Get the blacklisted task tracker statuses in the cluster
+   *  
+   * @return {@link Collection} of blacklisted {@link TaskTrackerStatus} 
+   */
+  public Collection<TaskTrackerStatus> blacklistedTaskTrackers() {
+    Collection<TaskTrackerStatus> blacklistedTrackers = 
+      new ArrayList<TaskTrackerStatus>();
+    synchronized (taskTrackers) {
+      for (TaskTrackerStatus status : taskTrackers.values()) {
+        if (faultyTrackers.isBlacklisted(status.getHost())) {
+          blacklistedTrackers.add(status);
+        }
+      }
+    }    
+    return blacklistedTrackers;
+  }
+
+  int getFaultCount(String hostName) {
+    return faultyTrackers.getFaultCount(hostName);
+  }
+  
+  /**
+   * Get the number of blacklisted trackers across all the jobs
+   * 
+   * @return
+   */
+  int getBlacklistedTrackerCount() {
+    return faultyTrackers.numBlacklistedTrackers;
+  }
+
+  /**
+   * Whether the tracker is blacklisted or not
+   * 
+   * @param trackerID
+   * 
+   * @return true if blacklisted, false otherwise
+   */
+  public boolean isBlacklisted(String trackerID) {
+    TaskTrackerStatus status = getTaskTracker(trackerID);
+    if (status != null) {
+      return faultyTrackers.isBlacklisted(status.getHost());
+    }
+    return false;
+  }
+  
   public TaskTrackerStatus getTaskTracker(String trackerID) {
     synchronized (taskTrackers) {
       return taskTrackers.get(trackerID);
@@ -1810,10 +2143,12 @@
    * tasks or jobs, and also 'reset' instructions during contingencies. 
    */
   public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, 
-                                                  boolean initialContact, boolean acceptNewTasks, short responseId) 
+                                                  boolean restarted,
+                                                  boolean acceptNewTasks, 
+                                                  short responseId) 
     throws IOException {
     LOG.debug("Got heartbeat from: " + status.getTrackerName() + 
-              " (initialContact: " + initialContact + 
+              " (restarted: " + restarted + 
               " acceptNewTasks: " + acceptNewTasks + ")" +
               " with responseId: " + responseId);
 
@@ -1824,12 +2159,19 @@
 
     // First check if the last heartbeat response got through
     String trackerName = status.getTrackerName();
+    boolean isBlacklisted = false;
+    if (restarted) {
+      faultyTrackers.markTrackerHealthy(status.getHost());
+    } else {
+      isBlacklisted = 
+        faultyTrackers.shouldAssignTasksToTracker(status.getHost());
+    }
     
     HeartbeatResponse prevHeartbeatResponse =
       trackerToHeartbeatResponseMap.get(trackerName);
     boolean addRestartInfo = false;
 
-    if (initialContact != true) {
+    if (restarted != true) {
       // If this isn't the 'initial contact' from the tasktracker,
       // there is something seriously wrong if the JobTracker has
       // no record of the 'previous heartbeat'; if so, ask the 
@@ -1841,6 +2183,7 @@
           addRestartInfo = true;
         } else {
           // Jobtracker might have restarted but no recovery is needed
+          // otherwise this code should not be reached
           LOG.warn("Serious problem, cannot find record of 'previous' " +
                    "heartbeat for '" + trackerName + 
                    "'; reinitializing the tasktracker");
@@ -1865,13 +2208,9 @@
       
     // Process this heartbeat 
     short newResponseId = (short)(responseId + 1);
-    if (!processHeartbeat(status, initialContact)) {
-      if (prevHeartbeatResponse != null) {
-        trackerToHeartbeatResponseMap.remove(trackerName);
-      }
-
+    if (!processHeartbeat(status, restarted)) {
       return new HeartbeatResponse(newResponseId, 
-                                   new TaskTrackerAction[] {new ReinitTrackerAction()});
+                   new TaskTrackerAction[] {new ReinitTrackerAction()});
     }
       
     // Initialize the response to be sent for the heartbeat
@@ -1879,7 +2218,7 @@
     List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
       
     // Check for new tasks to be executed on the tasktracker
-    if (acceptNewTasks) {
+    if (acceptNewTasks && !isBlacklisted) {
       TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
       if (taskTrackerStatus == null) {
         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
@@ -1983,8 +2322,10 @@
     if (oldStatus != null) {
       totalMaps -= oldStatus.countMapTasks();
       totalReduces -= oldStatus.countReduceTasks();
-      totalMapTaskCapacity -= oldStatus.getMaxMapTasks();
-      totalReduceTaskCapacity -= oldStatus.getMaxReduceTasks();
+      if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
+        totalMapTaskCapacity -= oldStatus.getMaxMapTasks();
+        totalReduceTaskCapacity -= oldStatus.getMaxReduceTasks();
+      }
       if (status == null) {
         taskTrackers.remove(trackerName);
         Integer numTaskTrackersInHost = 
@@ -2001,8 +2342,10 @@
     if (status != null) {
       totalMaps += status.countMapTasks();
       totalReduces += status.countReduceTasks();
-      totalMapTaskCapacity += status.getMaxMapTasks();
-      totalReduceTaskCapacity += status.getMaxReduceTasks();
+      if (!faultyTrackers.isBlacklisted(status.getHost())) {
+        totalMapTaskCapacity += status.getMaxMapTasks();
+        totalReduceTaskCapacity += status.getMaxReduceTasks();
+      }
       boolean alreadyPresent = false;
       if (taskTrackers.containsKey(trackerName)) {
         alreadyPresent = true;
@@ -2026,7 +2369,8 @@
    * Process incoming heartbeat messages from the task trackers.
    */
   private synchronized boolean processHeartbeat(
-                                                TaskTrackerStatus trackerStatus, boolean initialContact) {
+                                 TaskTrackerStatus trackerStatus, 
+                                 boolean restarted) {
     String trackerName = trackerStatus.getTrackerName();
     trackerStatus.setLastSeen(System.currentTimeMillis());
 
@@ -2034,7 +2378,7 @@
       synchronized (trackerExpiryQueue) {
         boolean seenBefore = updateTaskTrackerStatus(trackerName,
                                                      trackerStatus);
-        if (initialContact) {
+        if (restarted) {
           // If it's first contact, then clear out 
           // any state hanging around
           if (seenBefore) {
@@ -2044,12 +2388,17 @@
           // If not first contact, there should be some record of the tracker
           if (!seenBefore) {
             LOG.warn("Status from unknown Tracker : " + trackerName);
-            updateTaskTrackerStatus(trackerName, null);
+            // This is lost tracker that came back now, if it blacklisted
+            // increment the count of blacklisted trackers in the cluster
+            if (isBlacklisted(trackerName)) {
+              faultyTrackers.numBlacklistedTrackers += 1;
+            }
+            addNewTracker(trackerStatus);
             return false;
           }
         }
 
-        if (initialContact) {
+        if (restarted) {
           addNewTracker(trackerStatus);
         }
       }
@@ -2282,7 +2631,9 @@
 
   public synchronized ClusterStatus getClusterStatus() {
     synchronized (taskTrackers) {
-      return new ClusterStatus(taskTrackers.size(),
+      return new ClusterStatus(taskTrackers.size() - 
+                                 getBlacklistedTrackerCount(),
+                               getBlacklistedTrackerCount(),
                                totalMaps,
                                totalReduces,
                                totalMapTaskCapacity,

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Dec  5 03:04:17 2008
@@ -501,7 +501,6 @@
     // Clear out temporary files that might be lying around
     DistributedCache.purgeCache(this.fConf);
     cleanupStorage();
-    this.justStarted = true;
 
     this.jobClient = (InterTrackerProtocol) 
       RPC.waitForProxy(InterTrackerProtocol.class,
@@ -1848,6 +1847,8 @@
       task.setJobFile(localTaskFile.toString());
       localJobConf.set("mapred.local.dir",
                        fConf.get("mapred.local.dir"));
+      localJobConf.set("slave.host.name",
+                       fConf.get("slave.host.name"));
             
       localJobConf.set("mapred.task.id", task.getTaskID().toString());
       keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Dec  5 03:04:17 2008
@@ -528,6 +528,10 @@
   public int getNumEventsRecovered() {
     return jobTracker.getJobTracker().recoveryManager.totalEventsRecovered();
   }
+
+  public int getFaultCount(String hostName) {
+    return jobTracker.getJobTracker().getFaultCount(hostName);
+  }
   
   /**
    * Start the jobtracker.

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java?rev=723710&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java Fri Dec  5 03:04:17 2008
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob.SleepInputFormat;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+
+import junit.framework.TestCase;
+
+public class TestTrackerBlacklistAcrossJobs extends TestCase {
+  private static final String hosts[] = new String[] {
+    "host1.rack.com", "host2.rack.com", "host3.rack.com"
+  };
+  final Path inDir = new Path("/testing");
+  final Path outDir = new Path("/output");
+
+  public static class SleepJobFailOnHost extends MapReduceBase
+    implements Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
+    String hostname = "";
+    
+    public void configure(JobConf job) {
+      this.hostname = job.get("slave.host.name");
+    }
+    
+    public void map(IntWritable key, IntWritable value,
+                    OutputCollector<IntWritable, NullWritable> output,
+                    Reporter reporter)
+    throws IOException {
+      if (this.hostname.equals(hosts[0])) {
+        // fail here
+        throw new IOException("failing on host: " + hosts[0]);
+      }
+    }
+  }
+  
+  public void testBlacklistAcrossJobs() throws IOException {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    Configuration conf = new Configuration();
+    // setup dfs and input
+    dfs = new MiniDFSCluster(conf, 1, true, null, hosts);
+    fileSys = dfs.getFileSystem();
+    if (!fileSys.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), conf, 
+                                 new Path(inDir + "/file"), (short) 1);
+    // start mr cluster
+    JobConf jtConf = new JobConf();
+    jtConf.setInt("mapred.max.tracker.failures", 1);
+    jtConf.setInt("mapred.max.tracker.blacklists", 1);
+    mr = new MiniMRCluster(3, fileSys.getUri().toString(),
+                           1, null, hosts, jtConf);
+
+    // setup job configuration
+    JobConf mrConf = mr.createJobConf();
+    JobConf job = new JobConf(mrConf);
+    job.setNumMapTasks(30);
+    job.setNumReduceTasks(0);
+    job.setMapperClass(SleepJobFailOnHost.class);
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(NullWritable.class);
+    job.setOutputFormat(NullOutputFormat.class);
+    job.setInputFormat(SleepInputFormat.class);
+    FileInputFormat.setInputPaths(job, inDir);
+    FileOutputFormat.setOutputPath(job, outDir);
+    
+    // run the job
+    JobClient jc = new JobClient(mrConf);
+    RunningJob running = JobClient.runJob(job);
+    assertEquals("Job failed", JobStatus.SUCCEEDED, running.getJobState());
+    assertEquals("Didn't blacklist the host", 1, 
+      jc.getClusterStatus().getBlacklistedTrackers());
+    assertEquals("Fault count should be 1", 1, mr.getFaultCount(hosts[0]));
+
+    // run the same job once again 
+    // there should be no change in blacklist count
+    running = JobClient.runJob(job);
+    assertEquals("Job failed", JobStatus.SUCCEEDED, running.getJobState());
+    assertEquals("Didn't blacklist the host", 1,
+      jc.getClusterStatus().getBlacklistedTrackers());
+    assertEquals("Fault count should be 1", 1, mr.getFaultCount(hosts[0]));
+  }
+}

Modified: hadoop/core/trunk/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/jobtracker.jsp?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/jobtracker.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/jobtracker.jsp Fri Dec  5 03:04:17 2008
@@ -31,15 +31,18 @@
               "<tr><th>Maps</th><th>Reduces</th>" + 
               "<th>Total Submissions</th>" +
               "<th>Nodes</th><th>Map Task Capacity</th>" +
-              "<th>Reduce Task Capacity</th><th>Avg. Tasks/Node</th></tr>\n");
+              "<th>Reduce Task Capacity</th><th>Avg. Tasks/Node</th>" + 
+              "<th>Blacklisted Nodes</th></tr>\n");
     out.print("<tr><td>" + status.getMapTasks() + "</td><td>" +
               status.getReduceTasks() + "</td><td>" + 
               tracker.getTotalSubmissions() +
-              "</td><td><a href=\"machines.jsp\">" +
+              "</td><td><a href=\"machines.jsp?type=active\">" +
               status.getTaskTrackers() +
               "</a></td><td>" + status.getMaxMapTasks() +
               "</td><td>" + status.getMaxReduceTasks() +
               "</td><td>" + tasksPerNode +
+              "</td><td><a href=\"machines.jsp?type=blacklisted\">" +
+              status.getBlacklistedTrackers() + "</a>" +
               "</td></tr></table>\n");
 
     out.print("<br>");

Modified: hadoop/core/trunk/src/webapps/job/machines.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/machines.jsp?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/machines.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/machines.jsp Fri Dec  5 03:04:17 2008
@@ -12,14 +12,22 @@
   JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
   String trackerName = 
            StringUtils.simpleHostname(tracker.getJobTrackerMachine());
+  String type = request.getParameter("type");
 %>
 <%!
   public void generateTaskTrackerTable(JspWriter out,
+                                       String type,
                                        JobTracker tracker) throws IOException {
-    Collection c = tracker.taskTrackers();
-
+    Collection c;
+    if (("blacklisted").equals(type)) {
+      c = tracker.blacklistedTaskTrackers();
+    } else if (("active").equals(type)) {
+      c = tracker.activeTaskTrackers();
+    } else {
+      c = tracker.taskTrackers();
+    }
     if (c.size() == 0) {
-      out.print("There are currently no known Task Trackers.");
+      out.print("There are currently no known " + type + " Task Trackers.");
     } else {
       out.print("<center>\n");
       out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
@@ -77,7 +85,7 @@
 
 <h2>Task Trackers</h2>
 <%
-  generateTaskTrackerTable(out, tracker);
+  generateTaskTrackerTable(out, type, tracker);
 %>
 
 <%