You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/12/21 00:27:57 UTC
svn commit: r1221524 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/...
Author: vinodkv
Date: Tue Dec 20 23:27:57 2011
New Revision: 1221524
URL: http://svn.apache.org/viewvc?rev=1221524&view=rev
Log:
MAPREDUCE-3339. Fixed MR AM to stop considering node blacklisting after the number of nodes blacklisted crosses a threshold. Contributed by Siddharth Seth.
svn merge -c 1221523 --ignore-ancestry ../../trunk/
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Dec 20 23:27:57 2011
@@ -294,6 +294,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3376. Fixed Task to ensure it passes reporter to combiners using
old MR api. (Subroto Sanyal via acmurthy)
+ MAPREDUCE-3339. Fixed MR AM to stop considering node blacklisting after the
+ number of nodes blacklisted crosses a threshold. (Siddharth Seth via vinodkv)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Tue Dec 20 23:27:57 2011
@@ -68,6 +68,7 @@ public abstract class RMCommunicator ext
protected ApplicationAttemptId applicationAttemptId;
private AtomicBoolean stopped;
protected Thread allocatorThread;
+ @SuppressWarnings("rawtypes")
protected EventHandler eventHandler;
protected AMRMProtocol scheduler;
private final ClientService clientService;
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Tue Dec 20 23:27:57 2011
@@ -479,12 +479,16 @@ public class RMContainerAllocator extend
//something changed
recalculateReduceSchedule = true;
}
-
- List<Container> allocatedContainers = new ArrayList<Container>();
- for (Container cont : newContainers) {
- allocatedContainers.add(cont);
+
+ if (LOG.isDebugEnabled()) {
+ for (Container cont : newContainers) {
LOG.debug("Received new Container :" + cont);
+ }
}
+
+ //Called on each allocation. Will know about newly blacklisted/added hosts.
+ computeIgnoreBlacklisting();
+
for (ContainerStatus cont : finishedContainers) {
LOG.info("Received completed container " + cont);
TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Tue Dec 20 23:27:57 2011
@@ -18,15 +18,15 @@
package org.apache.hadoop.mapreduce.v2.app.rm;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.MRJob
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
@@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.BuilderUtils;
+
/**
* Keeps the data structures to send container requests to RM.
*/
@@ -74,9 +76,15 @@ public abstract class RMContainerRequest
private final Set<ContainerId> release = new TreeSet<ContainerId>();
private boolean nodeBlacklistingEnabled;
+ private int blacklistDisablePercent;
+ private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false);
+ private int blacklistedNodeCount = 0;
+ private int lastClusterNmCount = 0;
+ private int clusterNmCount = 0;
private int maxTaskFailuresPerNode;
private final Map<String, Integer> nodeFailures = new HashMap<String, Integer>();
- private final Set<String> blacklistedNodes = new HashSet<String>();
+ private final Set<String> blacklistedNodes = Collections
+ .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
public RMContainerRequestor(ClientService clientService, AppContext context) {
super(clientService, context);
@@ -122,7 +130,17 @@ public abstract class RMContainerRequest
LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled);
maxTaskFailuresPerNode =
conf.getInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3);
+ blacklistDisablePercent =
+ conf.getInt(
+ MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
+ MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT);
LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
+ if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
+ throw new YarnException("Invalid blacklistDisablePercent: "
+ + blacklistDisablePercent
+ + ". Should be an integer between 0 and 100 or -1 to disabled");
+ }
+ LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
}
protected AMResponse makeRemoteRequest() throws YarnRemoteException {
@@ -134,19 +152,49 @@ public abstract class RMContainerRequest
AMResponse response = allocateResponse.getAMResponse();
lastResponseID = response.getResponseId();
availableResources = response.getAvailableResources();
+ lastClusterNmCount = clusterNmCount;
+ clusterNmCount = allocateResponse.getNumClusterNodes();
LOG.info("getResources() for " + applicationId + ":" + " ask="
+ ask.size() + " release= " + release.size() +
" newContainers=" + response.getAllocatedContainers().size() +
" finishedContainers=" +
response.getCompletedContainersStatuses().size() +
- " resourcelimit=" + availableResources);
+ " resourcelimit=" + availableResources +
+ "knownNMs=" + clusterNmCount);
ask.clear();
release.clear();
return response;
}
+ // May be incorrect if there's multiple NodeManagers running on a single host.
+ // knownNodeCount is based on node managers, not hosts. blacklisting is
+ // currently based on hosts.
+ protected void computeIgnoreBlacklisting() {
+ if (blacklistDisablePercent != -1
+ && (blacklistedNodeCount != blacklistedNodes.size() ||
+ clusterNmCount != lastClusterNmCount)) {
+ blacklistedNodeCount = blacklistedNodes.size();
+ if (clusterNmCount == 0) {
+ LOG.info("KnownNode Count at 0. Not computing ignoreBlacklisting");
+ return;
+ }
+ int val = (int) ((float) blacklistedNodes.size() / clusterNmCount * 100);
+ if (val >= blacklistDisablePercent) {
+ if (ignoreBlacklisting.compareAndSet(false, true)) {
+ LOG.info("Ignore blacklisting set to true. Known: " + clusterNmCount
+ + ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
+ }
+ } else {
+ if (ignoreBlacklisting.compareAndSet(true, false)) {
+ LOG.info("Ignore blacklisting set to false. Known: " + clusterNmCount
+ + ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
+ }
+ }
+ }
+ }
+
protected void containerFailedOnHost(String hostName) {
if (!nodeBlacklistingEnabled) {
return;
@@ -161,8 +209,10 @@ public abstract class RMContainerRequest
LOG.info(failures + " failures on node " + hostName);
if (failures >= maxTaskFailuresPerNode) {
blacklistedNodes.add(hostName);
+ //Even if blacklisting is ignored, continue to remove the host from
+ // the request table. The RM may have additional nodes it can allocate on.
LOG.info("Blacklisted host " + hostName);
-
+
//remove all the requests corresponding to this hostname
for (Map<String, Map<Resource, ResourceRequest>> remoteRequests
: remoteRequestsTable.values()){
@@ -316,7 +366,7 @@ public abstract class RMContainerRequest
}
protected boolean isNodeBlacklisted(String hostname) {
- if (!nodeBlacklistingEnabled) {
+ if (!nodeBlacklistingEnabled || ignoreBlacklisting.get()) {
return false;
}
return blacklistedNodes.contains(hostname);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Tue Dec 20 23:27:57 2011
@@ -488,6 +488,8 @@ public class TestRMContainerAllocator {
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+ conf.setInt(
+ MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
@@ -581,12 +583,183 @@ public class TestRMContainerAllocator {
}
@Test
+ public void testIgnoreBlacklisting() throws Exception {
+ LOG.info("Running testIgnoreBlacklisting");
+
+ Configuration conf = new Configuration();
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+ conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+ conf.setInt(
+ MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, 33);
+
+ MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ DrainDispatcher dispatcher =
+ (DrainDispatcher) rm.getRMContext().getDispatcher();
+
+ // Submit the application
+ RMApp app = rm.submitApp(1024);
+ dispatcher.await();
+
+ MockNM[] nodeManagers = new MockNM[10];
+ int nmNum = 0;
+ List<TaskAttemptContainerAssignedEvent> assigned = null;
+ nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+ nodeManagers[0].nodeHeartbeat(true);
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId =
+ app.getCurrentAppAttempt().getAppAttemptId();
+ rm.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false));
+ MyContainerAllocator allocator =
+ new MyContainerAllocator(rm, conf, appAttemptId, mockJob);
+
+ // Known=1, blacklisted=0, ignore should be false - assign first container
+ assigned =
+ getContainerOnHost(jobId, 1, 1024, new String[] { "h1" },
+ nodeManagers[0], dispatcher, allocator);
+ Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
+ LOG.info("Failing container _1 on H1 (Node should be blacklisted and"
+ + " ignore blacklisting enabled");
+ // Send events to blacklist nodes h1 and h2
+ ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
+ allocator.sendFailure(f1);
+
+ // Test single node.
+ // Known=1, blacklisted=1, ignore should be true - assign 1
+ assigned =
+ getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
+ nodeManagers[0], dispatcher, allocator);
+ Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
+ nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+ // Known=2, blacklisted=1, ignore should be true - assign 1 anyway.
+ assigned =
+ getContainerOnHost(jobId, 3, 1024, new String[] { "h2" },
+ nodeManagers[1], dispatcher, allocator);
+ Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
+ nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+ // Known=3, blacklisted=1, ignore should be true - assign 1 anyway.
+ assigned =
+ getContainerOnHost(jobId, 4, 1024, new String[] { "h3" },
+ nodeManagers[2], dispatcher, allocator);
+ Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
+ // Known=3, blacklisted=1, ignore should be true - assign 1
+ assigned =
+ getContainerOnHost(jobId, 5, 1024, new String[] { "h1" },
+ nodeManagers[0], dispatcher, allocator);
+ Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
+ nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+ // Known=4, blacklisted=1, ignore should be false - assign 1 anyway
+ assigned =
+ getContainerOnHost(jobId, 6, 1024, new String[] { "h4" },
+ nodeManagers[3], dispatcher, allocator);
+ Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
+ // Test blacklisting re-enabled.
+ // Known=4, blacklisted=1, ignore should be false - no assignment on h1
+ assigned =
+ getContainerOnHost(jobId, 7, 1024, new String[] { "h1" },
+ nodeManagers[0], dispatcher, allocator);
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+ // RMContainerRequestor would have created a replacement request.
+
+ // Blacklist h2
+ ContainerFailedEvent f2 = createFailEvent(jobId, 3, "h2", false);
+ allocator.sendFailure(f2);
+
+ // Test ignore blacklisting re-enabled
+ // Known=4, blacklisted=2, ignore should be true. Should assign 2
+ // containers.
+ assigned =
+ getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
+ nodeManagers[0], dispatcher, allocator);
+ Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
+
+ // Known=4, blacklisted=2, ignore should be true.
+ assigned =
+ getContainerOnHost(jobId, 9, 1024, new String[] { "h2" },
+ nodeManagers[1], dispatcher, allocator);
+ Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
+ // Test blacklist while ignore blacklisting enabled
+ ContainerFailedEvent f3 = createFailEvent(jobId, 4, "h3", false);
+ allocator.sendFailure(f3);
+
+ nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+ // Known=5, blacklisted=3, ignore should be true.
+ assigned =
+ getContainerOnHost(jobId, 10, 1024, new String[] { "h3" },
+ nodeManagers[2], dispatcher, allocator);
+ Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
+ // Assign on 5 more nodes - to re-enable blacklisting
+ for (int i = 0; i < 5; i++) {
+ nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
+ assigned =
+ getContainerOnHost(jobId, 11 + i, 1024,
+ new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i],
+ dispatcher, allocator);
+ Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+ }
+
+ // Test h3 (blacklisted while ignoring blacklisting) is blacklisted.
+ assigned =
+ getContainerOnHost(jobId, 20, 1024, new String[] { "h3" },
+ nodeManagers[2], dispatcher, allocator);
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+ }
+
+ private MockNM registerNodeManager(int i, MyResourceManager rm,
+ DrainDispatcher dispatcher) throws Exception {
+ MockNM nm = rm.registerNode("h" + (i + 1) + ":1234", 10240);
+ dispatcher.await();
+ return nm;
+ }
+
+ private
+ List<TaskAttemptContainerAssignedEvent> getContainerOnHost(JobId jobId,
+ int taskAttemptId, int memory, String[] hosts, MockNM mockNM,
+ DrainDispatcher dispatcher, MyContainerAllocator allocator)
+ throws Exception {
+ ContainerRequestEvent reqEvent =
+ createReq(jobId, taskAttemptId, memory, hosts);
+ allocator.sendRequest(reqEvent);
+
+ // Send the request to the RM
+ List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ // Heartbeat from the required nodeManager
+ mockNM.nodeHeartbeat(true);
+ dispatcher.await();
+
+ assigned = allocator.schedule();
+ dispatcher.await();
+ return assigned;
+ }
+
+ @Test
public void testBlackListedNodesWithSchedulingToThatNode() throws Exception {
LOG.info("Running testBlackListedNodesWithSchedulingToThatNode");
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+ conf.setInt(
+ MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1);
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Dec 20 23:27:57 2011
@@ -348,8 +348,14 @@ public interface MRJobConfig {
/** Enable blacklisting of nodes in the job.*/
public static final String MR_AM_JOB_NODE_BLACKLISTING_ENABLE =
- MR_AM_PREFIX + "job.node.blacklisting.enable";
+ MR_AM_PREFIX + "job.node-blacklisting.enable";
+ /** Ignore blacklisting if a certain percentage of nodes have been blacklisted */
+ public static final String MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT =
+ MR_AM_PREFIX + "job.node-blacklisting.ignore-threshold-node-percent";
+ public static final int DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT =
+ 33;
+
/** Enable job recovery.*/
public static final String MR_AM_JOB_RECOVERY_ENABLE =
MR_AM_PREFIX + "job.recovery.enable";
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java Tue Dec 20 23:27:57 2011
@@ -61,4 +61,17 @@ public interface AllocateResponse {
@Private
@Unstable
public abstract void setAMResponse(AMResponse amResponse);
+
+
+ /**
+ * Get the number of hosts available on the cluster.
+ * @return the available host count.
+ */
+ @Public
+ @Stable
+ public int getNumClusterNodes();
+
+ @Private
+ @Unstable
+ public void setNumClusterNodes(int numNodes);
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java Tue Dec 20 23:27:57 2011
@@ -29,7 +29,8 @@ import org.apache.hadoop.yarn.proto.Yarn
-public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto> implements AllocateResponse {
+public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
+ implements AllocateResponse {
AllocateResponseProto proto = AllocateResponseProto.getDefaultInstance();
AllocateResponseProto.Builder builder = null;
boolean viaProto = false;
@@ -95,7 +96,20 @@ public class AllocateResponsePBImpl exte
builder.clearAMResponse();
this.amResponse = aMResponse;
}
+
+ @Override
+ public int getNumClusterNodes() {
+ AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getNumClusterNodes();
+ }
+
+ @Override
+ public void setNumClusterNodes(int numNodes) {
+ maybeInitBuilder();
+ builder.setNumClusterNodes(numNodes);
+ }
+
private AMResponsePBImpl convertFromProtoFormat(AMResponseProto p) {
return new AMResponsePBImpl(p);
}
@@ -103,7 +117,4 @@ public class AllocateResponsePBImpl exte
private AMResponseProto convertToProtoFormat(AMResponse t) {
return ((AMResponsePBImpl)t).getProto();
}
-
-
-
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto Tue Dec 20 23:27:57 2011
@@ -59,6 +59,7 @@ message AllocateRequestProto {
message AllocateResponseProto {
optional AMResponseProto AM_response = 1;
+ optional int32 num_cluster_nodes = 2;
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Tue Dec 20 23:27:57 2011
@@ -285,6 +285,7 @@ public class ApplicationMasterService ex
response.setAvailableResources(allocation.getResourceLimit());
responseMap.put(appAttemptId, response);
allocateResponse.setAMResponse(response);
+ allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
return allocateResponse;
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Tue Dec 20 23:27:57 2011
@@ -80,6 +80,14 @@ public interface YarnScheduler extends E
public Resource getMaximumResourceCapability();
/**
+ * Get the number of nodes available in the cluster.
+ * @return the number of available nodes.
+ */
+ @Public
+ @Stable
+ public int getNumClusterNodes();
+
+ /**
* The main api between the ApplicationMaster and the Scheduler.
* The ApplicationMaster is updating his future resource requirements
* and may release containers he doens't need.
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Tue Dec 20 23:27:57 2011
@@ -159,6 +159,7 @@ implements ResourceScheduler, CapacitySc
return maximumAllocation;
}
+ @Override
public synchronized int getNumClusterNodes() {
return numNodeManagers;
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1221524&r1=1221523&r2=1221524&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Tue Dec 20 23:27:57 2011
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -36,7 +35,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.Lock;
@@ -180,6 +178,11 @@ public class FifoScheduler implements Re
}
@Override
+ public int getNumClusterNodes() {
+ return nodes.size();
+ }
+
+ @Override
public Resource getMaximumResourceCapability() {
return maximumAllocation;
}