You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/12/21 00:27:57 UTC

svn commit: r1221524 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/...

Author: vinodkv
Date: Tue Dec 20 23:27:57 2011
New Revision: 1221524

URL: http://svn.apache.org/viewvc?rev=1221524&view=rev
Log:
MAPREDUCE-3339. Fixed MR AM to stop considering node blacklisting after the number of nodes blacklisted crosses a threshold. Contributed by Siddharth Seth.
svn merge -c 1221523 --ignore-ancestry ../../trunk/

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Dec 20 23:27:57 2011
@@ -294,6 +294,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3376. Fixed Task to ensure it passes reporter to combiners using
     old MR api. (Subroto Sanyal via acmurthy)
 
+    MAPREDUCE-3339. Fixed MR AM to stop considering node blacklisting after the
+    number of nodes blacklisted crosses a threshold. (Siddharth Seth via vinodkv)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Tue Dec 20 23:27:57 2011
@@ -68,6 +68,7 @@ public abstract class RMCommunicator ext
   protected ApplicationAttemptId applicationAttemptId;
   private AtomicBoolean stopped;
   protected Thread allocatorThread;
+  @SuppressWarnings("rawtypes")
   protected EventHandler eventHandler;
   protected AMRMProtocol scheduler;
   private final ClientService clientService;

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Tue Dec 20 23:27:57 2011
@@ -479,12 +479,16 @@ public class RMContainerAllocator extend
       //something changed
       recalculateReduceSchedule = true;
     }
-    
-    List<Container> allocatedContainers = new ArrayList<Container>();
-    for (Container cont : newContainers) {
-        allocatedContainers.add(cont);
+
+    if (LOG.isDebugEnabled()) {
+      for (Container cont : newContainers) {
         LOG.debug("Received new Container :" + cont);
+      }
     }
+
+    //Called on each allocation. Will know about newly blacklisted/added hosts.
+    computeIgnoreBlacklisting();
+    
     for (ContainerStatus cont : finishedContainers) {
       LOG.info("Received completed container " + cont);
       TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Tue Dec 20 23:27:57 2011
@@ -18,15 +18,15 @@
 
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.MRJob
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.AMResponse;
@@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 
+
 /**
  * Keeps the data structures to send container requests to RM.
  */
@@ -74,9 +76,15 @@ public abstract class RMContainerRequest
   private final Set<ContainerId> release = new TreeSet<ContainerId>(); 
 
   private boolean nodeBlacklistingEnabled;
+  private int blacklistDisablePercent;
+  private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false);
+  private int blacklistedNodeCount = 0;
+  private int lastClusterNmCount = 0;
+  private int clusterNmCount = 0;
   private int maxTaskFailuresPerNode;
   private final Map<String, Integer> nodeFailures = new HashMap<String, Integer>();
-  private final Set<String> blacklistedNodes = new HashSet<String>();
+  private final Set<String> blacklistedNodes = Collections
+      .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
 
   public RMContainerRequestor(ClientService clientService, AppContext context) {
     super(clientService, context);
@@ -122,7 +130,17 @@ public abstract class RMContainerRequest
     LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled);
     maxTaskFailuresPerNode = 
       conf.getInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3);
+    blacklistDisablePercent =
+        conf.getInt(
+            MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
+            MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT);
     LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
+    if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
+      throw new YarnException("Invalid blacklistDisablePercent: "
+          + blacklistDisablePercent
+          + ". Should be an integer between 0 and 100 or -1 to disabled");
+    }
+    LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
   }
 
   protected AMResponse makeRemoteRequest() throws YarnRemoteException {
@@ -134,19 +152,49 @@ public abstract class RMContainerRequest
     AMResponse response = allocateResponse.getAMResponse();
     lastResponseID = response.getResponseId();
     availableResources = response.getAvailableResources();
+    lastClusterNmCount = clusterNmCount;
+    clusterNmCount = allocateResponse.getNumClusterNodes();
 
     LOG.info("getResources() for " + applicationId + ":" + " ask="
         + ask.size() + " release= " + release.size() + 
         " newContainers=" + response.getAllocatedContainers().size() + 
         " finishedContainers=" + 
         response.getCompletedContainersStatuses().size() + 
-        " resourcelimit=" + availableResources);
+        " resourcelimit=" + availableResources + 
+        "knownNMs=" + clusterNmCount);
 
     ask.clear();
     release.clear();
     return response;
   }
 
+  // May be incorrect if there's multiple NodeManagers running on a single host.
+  // knownNodeCount is based on node managers, not hosts. blacklisting is
+  // currently based on hosts.
+  protected void computeIgnoreBlacklisting() {
+    if (blacklistDisablePercent != -1
+        && (blacklistedNodeCount != blacklistedNodes.size() ||
+            clusterNmCount != lastClusterNmCount)) {
+      blacklistedNodeCount = blacklistedNodes.size();
+      if (clusterNmCount == 0) {
+        LOG.info("KnownNode Count at 0. Not computing ignoreBlacklisting");
+        return;
+      }
+      int val = (int) ((float) blacklistedNodes.size() / clusterNmCount * 100);
+      if (val >= blacklistDisablePercent) {
+        if (ignoreBlacklisting.compareAndSet(false, true)) {
+          LOG.info("Ignore blacklisting set to true. Known: " + clusterNmCount
+              + ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
+        }
+      } else {
+        if (ignoreBlacklisting.compareAndSet(true, false)) {
+          LOG.info("Ignore blacklisting set to false. Known: " + clusterNmCount
+              + ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
+        }
+      }
+    }
+  }
+  
   protected void containerFailedOnHost(String hostName) {
     if (!nodeBlacklistingEnabled) {
       return;
@@ -161,8 +209,10 @@ public abstract class RMContainerRequest
     LOG.info(failures + " failures on node " + hostName);
     if (failures >= maxTaskFailuresPerNode) {
       blacklistedNodes.add(hostName);
+      //Even if blacklisting is ignored, continue to remove the host from
+      // the request table. The RM may have additional nodes it can allocate on.
       LOG.info("Blacklisted host " + hostName);
-      
+
       //remove all the requests corresponding to this hostname
       for (Map<String, Map<Resource, ResourceRequest>> remoteRequests 
           : remoteRequestsTable.values()){
@@ -316,7 +366,7 @@ public abstract class RMContainerRequest
   }
   
   protected boolean isNodeBlacklisted(String hostname) {
-    if (!nodeBlacklistingEnabled) {
+    if (!nodeBlacklistingEnabled || ignoreBlacklisting.get()) {
       return false;
     }
     return blacklistedNodes.contains(hostname);

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Tue Dec 20 23:27:57 2011
@@ -488,6 +488,8 @@ public class TestRMContainerAllocator {
     Configuration conf = new Configuration();
     conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
     conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+    conf.setInt(
+        MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
     
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();
@@ -581,12 +583,183 @@ public class TestRMContainerAllocator {
   }
   
   @Test
+  public void testIgnoreBlacklisting() throws Exception {
+    LOG.info("Running testIgnoreBlacklisting");
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+    conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+    conf.setInt(
+        MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, 33);
+
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher =
+        (DrainDispatcher) rm.getRMContext().getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM[] nodeManagers = new MockNM[10];
+    int nmNum = 0;
+    List<TaskAttemptContainerAssignedEvent> assigned = null;
+    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+    nodeManagers[0].nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId =
+        app.getCurrentAppAttempt().getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false));
+    MyContainerAllocator allocator =
+        new MyContainerAllocator(rm, conf, appAttemptId, mockJob);
+
+    // Known=1, blacklisted=0, ignore should be false - assign first container
+    assigned =
+        getContainerOnHost(jobId, 1, 1024, new String[] { "h1" },
+            nodeManagers[0], dispatcher, allocator);
+    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
+    LOG.info("Failing container _1 on H1 (Node should be blacklisted and"
+        + " ignore blacklisting enabled");
+    // Send events to blacklist nodes h1 and h2
+    ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
+    allocator.sendFailure(f1);
+
+    // Test single node.
+    // Known=1, blacklisted=1, ignore should be true - assign 1
+    assigned =
+        getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
+            nodeManagers[0], dispatcher, allocator);
+    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
+    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+    // Known=2, blacklisted=1, ignore should be true - assign 1 anyway.
+    assigned =
+        getContainerOnHost(jobId, 3, 1024, new String[] { "h2" },
+            nodeManagers[1], dispatcher, allocator);
+    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
+    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+    // Known=3, blacklisted=1, ignore should be true - assign 1 anyway.
+    assigned =
+        getContainerOnHost(jobId, 4, 1024, new String[] { "h3" },
+            nodeManagers[2], dispatcher, allocator);
+    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
+    // Known=3, blacklisted=1, ignore should be true - assign 1
+    assigned =
+        getContainerOnHost(jobId, 5, 1024, new String[] { "h1" },
+            nodeManagers[0], dispatcher, allocator);
+    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
+    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+    // Known=4, blacklisted=1, ignore should be false - assign 1 anyway
+    assigned =
+        getContainerOnHost(jobId, 6, 1024, new String[] { "h4" },
+            nodeManagers[3], dispatcher, allocator);
+    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
+    // Test blacklisting re-enabled.
+    // Known=4, blacklisted=1, ignore should be false - no assignment on h1
+    assigned =
+        getContainerOnHost(jobId, 7, 1024, new String[] { "h1" },
+            nodeManagers[0], dispatcher, allocator);
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+    // RMContainerRequestor would have created a replacement request.
+
+    // Blacklist h2
+    ContainerFailedEvent f2 = createFailEvent(jobId, 3, "h2", false);
+    allocator.sendFailure(f2);
+
+    // Test ignore blacklisting re-enabled
+    // Known=4, blacklisted=2, ignore should be true. Should assign 2
+    // containers.
+    assigned =
+        getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
+            nodeManagers[0], dispatcher, allocator);
+    Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
+
+    // Known=4, blacklisted=2, ignore should be true.
+    assigned =
+        getContainerOnHost(jobId, 9, 1024, new String[] { "h2" },
+            nodeManagers[1], dispatcher, allocator);
+    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
+    // Test blacklist while ignore blacklisting enabled
+    ContainerFailedEvent f3 = createFailEvent(jobId, 4, "h3", false);
+    allocator.sendFailure(f3);
+
+    nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+    // Known=5, blacklisted=3, ignore should be true.
+    assigned =
+        getContainerOnHost(jobId, 10, 1024, new String[] { "h3" },
+            nodeManagers[2], dispatcher, allocator);
+    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+    
+    // Assign on 5 more nodes - to re-enable blacklisting
+    for (int i = 0; i < 5; i++) {
+      nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+      assigned =
+          getContainerOnHost(jobId, 11 + i, 1024,
+              new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i],
+              dispatcher, allocator);
+      Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+    }
+
+    // Test h3 (blacklisted while ignoring blacklisting) is blacklisted.
+    assigned =
+        getContainerOnHost(jobId, 20, 1024, new String[] { "h3" },
+            nodeManagers[2], dispatcher, allocator);
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+  }
+
+  private MockNM registerNodeManager(int i, MyResourceManager rm,
+      DrainDispatcher dispatcher) throws Exception {
+    MockNM nm = rm.registerNode("h" + (i + 1) + ":1234", 10240);
+    dispatcher.await();
+    return nm;
+  }
+
+  private
+      List<TaskAttemptContainerAssignedEvent> getContainerOnHost(JobId jobId,
+          int taskAttemptId, int memory, String[] hosts, MockNM mockNM,
+          DrainDispatcher dispatcher, MyContainerAllocator allocator)
+          throws Exception {
+    ContainerRequestEvent reqEvent =
+        createReq(jobId, taskAttemptId, memory, hosts);
+    allocator.sendRequest(reqEvent);
+
+    // Send the request to the RM
+    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+    // Heartbeat from the required nodeManager
+    mockNM.nodeHeartbeat(true);
+    dispatcher.await();
+
+    assigned = allocator.schedule();
+    dispatcher.await();
+    return assigned;
+  }
+ 
+  @Test
   public void testBlackListedNodesWithSchedulingToThatNode() throws Exception {
     LOG.info("Running testBlackListedNodesWithSchedulingToThatNode");
 
     Configuration conf = new Configuration();
     conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
     conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+    conf.setInt(
+        MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
     
     MyResourceManager rm = new MyResourceManager(conf);
     rm.start();

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Dec 20 23:27:57 2011
@@ -348,8 +348,14 @@ public interface MRJobConfig {
 
   /** Enable blacklisting of nodes in the job.*/
   public static final String MR_AM_JOB_NODE_BLACKLISTING_ENABLE = 
-    MR_AM_PREFIX  + "job.node.blacklisting.enable";
+    MR_AM_PREFIX  + "job.node-blacklisting.enable";
 
+  /** Ignore blacklisting if a certain percentage of nodes have been blacklisted */
+  public static final String MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT =
+      MR_AM_PREFIX + "job.node-blacklisting.ignore-threshold-node-percent";
+  public static final int DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT =
+      33;
+  
   /** Enable job recovery.*/
   public static final String MR_AM_JOB_RECOVERY_ENABLE = 
     MR_AM_PREFIX + "job.recovery.enable";

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java Tue Dec 20 23:27:57 2011
@@ -61,4 +61,17 @@ public interface AllocateResponse {
   @Private
   @Unstable
   public abstract void setAMResponse(AMResponse amResponse);
+  
+  
+  /**
+   * Get the number of hosts available on the cluster.
+   * @return the available host count.
+   */
+  @Public
+  @Stable
+  public int getNumClusterNodes();
+  
+  @Private
+  @Unstable
+  public void setNumClusterNodes(int numNodes);
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java Tue Dec 20 23:27:57 2011
@@ -29,7 +29,8 @@ import org.apache.hadoop.yarn.proto.Yarn
 
 
     
-public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto> implements AllocateResponse {
+public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
+    implements AllocateResponse {
   AllocateResponseProto proto = AllocateResponseProto.getDefaultInstance();
   AllocateResponseProto.Builder builder = null;
   boolean viaProto = false;
@@ -95,7 +96,20 @@ public class AllocateResponsePBImpl exte
       builder.clearAMResponse();
     this.amResponse = aMResponse;
   }
+  
+  @Override
+  public int getNumClusterNodes() {
+    AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getNumClusterNodes();
+  }
+  
+  @Override
+  public void setNumClusterNodes(int numNodes) {
+    maybeInitBuilder();
+    builder.setNumClusterNodes(numNodes);
+  }
 
+  
   private AMResponsePBImpl convertFromProtoFormat(AMResponseProto p) {
     return new AMResponsePBImpl(p);
   }
@@ -103,7 +117,4 @@ public class AllocateResponsePBImpl exte
   private AMResponseProto convertToProtoFormat(AMResponse t) {
     return ((AMResponsePBImpl)t).getProto();
   }
-
-
-
 }  

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto Tue Dec 20 23:27:57 2011
@@ -59,6 +59,7 @@ message AllocateRequestProto {
 
 message AllocateResponseProto {
   optional AMResponseProto AM_response = 1;
+  optional int32 num_cluster_nodes = 2;
 }
 
 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Tue Dec 20 23:27:57 2011
@@ -285,6 +285,7 @@ public class ApplicationMasterService ex
       response.setAvailableResources(allocation.getResourceLimit());
       responseMap.put(appAttemptId, response);
       allocateResponse.setAMResponse(response);
+      allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
       return allocateResponse;
     }
   }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Tue Dec 20 23:27:57 2011
@@ -80,6 +80,14 @@ public interface YarnScheduler extends E
   public Resource getMaximumResourceCapability();
 
   /**
+   * Get the number of nodes available in the cluster.
+   * @return the number of available nodes.
+   */
+  @Public
+  @Stable
+  public int getNumClusterNodes();
+  
+  /**
    * The main api between the ApplicationMaster and the Scheduler.
    * The ApplicationMaster is updating his future resource requirements
    * and may release containers he doens't need.

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Tue Dec 20 23:27:57 2011
@@ -159,6 +159,7 @@ implements ResourceScheduler, CapacitySc
     return maximumAllocation;
   }
 
+  @Override
   public synchronized int getNumClusterNodes() {
     return numNodeManagers;
   }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Tue Dec 20 23:27:57 2011
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -36,7 +35,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.Lock;
@@ -180,6 +178,11 @@ public class FifoScheduler implements Re
   }
 
   @Override
+  public int getNumClusterNodes() {
+    return nodes.size();
+  }
+  
+  @Override
   public Resource getMaximumResourceCapability() {
     return maximumAllocation;
   }