You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2011/06/13 10:56:24 UTC
svn commit: r1135062 - in /hadoop/common/branches/MR-279/mapreduce: ./
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/
m...
Author: sharad
Date: Mon Jun 13 08:56:23 2011
New Revision: 1135062
URL: http://svn.apache.org/viewvc?rev=1135062&view=rev
Log:
Job level node blacklisting.
Added:
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerFailedEvent.java
Modified:
hadoop/common/branches/MR-279/mapreduce/CHANGES.txt
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AMConstants.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocator.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
Modified: hadoop/common/branches/MR-279/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/CHANGES.txt?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/CHANGES.txt (original)
+++ hadoop/common/branches/MR-279/mapreduce/CHANGES.txt Mon Jun 13 08:56:23 2011
@@ -4,6 +4,7 @@ Trunk (unreleased changes)
MAPREDUCE-279
+ Job level node blacklisting. (sharad)
Fixing the bug which was causing FAILED jobs to be displayed as COMPLETED
on the RM UI. (vinodkv)
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AMConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AMConstants.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AMConstants.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AMConstants.java Mon Jun 13 08:56:23 2011
@@ -67,4 +67,7 @@ public interface AMConstants {
public static final float DEFAULT_REDUCE_PREEMPTION_LIMIT = 0.5f;
public static final String REDUCE_PREEMPTION_LIMIT = MRConstants.YARN_MR_PREFIX
+ "reduce.preemption.limit";
+
+ public static final String NODE_BLACKLISTING_ENABLE = MRConstants.YARN_MR_PREFIX
+ + "node.blacklisting.enable";
}
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java Mon Jun 13 08:56:23 2011
@@ -46,6 +46,10 @@ public interface TaskAttempt {
*/
ContainerId getAssignedContainerID();
+ /**If container Assigned then return container mgr address, otherwise null.
+ */
+ String getAssignedContainerMgrAddress();
+
/**If container Assigned then return the node's http address, otherwise null.
*/
String getNodeHttpAddress();
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Mon Jun 13 08:56:23 2011
@@ -694,6 +694,16 @@ public abstract class TaskAttemptImpl im
}
@Override
+ public String getAssignedContainerMgrAddress() {
+ readLock.lock();
+ try {
+ return containerMgrAddress;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
public long getLaunchTime() {
readLock.lock();
try {
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Mon Jun 13 08:56:23 2011
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@@ -745,6 +746,13 @@ public abstract class TaskImpl implement
public TaskState transition(TaskImpl task, TaskEvent event) {
task.failedAttempts++;
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
+ TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID());
+ if (attempt.getAssignedContainerMgrAddress() != null) {
+ //container was assigned
+ task.eventHandler.handle(new ContainerFailedEvent(attempt.getID(),
+ attempt.getAssignedContainerMgrAddress()));
+ }
+
if (task.failedAttempts < task.maxAttempts) {
task.handleTaskAttemptCompletion(
((TaskTAttemptEvent) event).getTaskAttemptID(),
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocator.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocator.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocator.java Mon Jun 13 08:56:23 2011
@@ -25,7 +25,8 @@ public interface ContainerAllocator exte
enum EventType {
CONTAINER_REQ,
- CONTAINER_DEALLOCATE
+ CONTAINER_DEALLOCATE,
+ CONTAINER_FAILED
}
}
Added: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerFailedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerFailedEvent.java?rev=1135062&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerFailedEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerFailedEvent.java Mon Jun 13 08:56:23 2011
@@ -0,0 +1,18 @@
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+public class ContainerFailedEvent extends ContainerAllocatorEvent {
+
+ private final String contMgrAddress;
+
+ public ContainerFailedEvent(TaskAttemptId attemptID, String contMgrAddr) {
+ super(attemptID, ContainerAllocator.EventType.CONTAINER_FAILED);
+ this.contMgrAddress = contMgrAddr;
+ }
+
+ public String getContMgrAddress() {
+ return contMgrAddress;
+ }
+
+}
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Mon Jun 13 08:56:23 2011
@@ -232,9 +232,23 @@ public class RMContainerAllocator extend
LOG.error("Could not deallocate container for task attemptId " +
aId);
}
+ } else if (
+ event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) {
+ ContainerFailedEvent fEv = (ContainerFailedEvent) event;
+ String host = getHost(fEv.getContMgrAddress());
+ containerFailedOnHost(host);
}
}
+ private static String getHost(String contMgrAddress) {
+ String host = contMgrAddress;
+ String[] hostport = host.split(":");
+ if (hostport.length == 2) {
+ host = hostport[0];
+ }
+ return host;
+ }
+
private void preemptReducesIfNeeded() {
if (reduceResourceReqt == 0) {
return; //no reduces
@@ -589,11 +603,7 @@ public class RMContainerAllocator extend
ContainerRequest assigned = null;
while (assigned == null && maps.size() > 0
&& allocated.getResource().getMemory() >= mapResourceReqt) {
- String host = allocated.getContainerManagerAddress();
- String[] hostport = host.split(":");
- if (hostport.length == 2) {
- host = hostport[0];
- }
+ String host = getHost(allocated.getContainerManagerAddress());
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
while (list != null && list.size() > 0) {
LOG.info("Host matched to the request list " + host);
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Mon Jun 13 08:56:23 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -28,7 +29,10 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app.AMConstants;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -69,6 +73,11 @@ public abstract class RMContainerRequest
private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>();
private final Set<Container> release = new TreeSet<Container>();
+ private boolean nodeBlacklistingEnabled;
+ private int maxTaskFailuresPerNode;
+ private final Map<String, Integer> nodeFailures = new HashMap<String, Integer>();
+ private final Set<String> blacklistedNodes = new HashSet<String>();
+
public RMContainerRequestor(ClientService clientService, AppContext context) {
super(clientService, context);
}
@@ -89,7 +98,18 @@ public abstract class RMContainerRequest
this.priority = priority;
}
}
-
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ nodeBlacklistingEnabled =
+ conf.getBoolean(AMConstants.NODE_BLACKLISTING_ENABLE, true);
+ LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled);
+ maxTaskFailuresPerNode =
+ conf.getInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3);
+ LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
+ }
+
protected abstract void heartbeat() throws Exception;
protected List<Container> makeRemoteRequest() throws YarnRemoteException {
@@ -118,6 +138,39 @@ public abstract class RMContainerRequest
return allContainers;
}
+ protected void containerFailedOnHost(String hostName) {
+ if (!nodeBlacklistingEnabled) {
+ return;
+ }
+ if (blacklistedNodes.contains(hostName)) {
+ LOG.info("Host " + hostName + " is already blacklisted.");
+ return; //already blacklisted
+ }
+ Integer failures = nodeFailures.remove(hostName);
+ failures = failures == null ? 0 : failures;
+ failures++;
+ LOG.info(failures + " failures on node " + hostName);
+ if (failures >= maxTaskFailuresPerNode) {
+ blacklistedNodes.add(hostName);
+ LOG.info("Blacklisted host " + hostName);
+
+ //remove all the requests corresponding to this hostname
+ for (Map<String, Map<Resource, ResourceRequest>> remoteRequests
+ : remoteRequestsTable.values()){
+ //remove from host
+ Map<Resource, ResourceRequest> reqMap = remoteRequests.remove(hostName);
+ if (reqMap != null) {
+ for (ResourceRequest req : reqMap.values()) {
+ ask.remove(req);
+ }
+ }
+ //TODO: remove from rack
+ }
+ } else {
+ nodeFailures.put(hostName, failures);
+ }
+ }
+
protected Resource getAvailableResources() {
return availableResources;
}
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Mon Jun 13 08:56:23 2011
@@ -242,6 +242,11 @@ public class MockJobs extends MockApps {
public List<String> getDiagnostics() {
return diags;
}
+
+ @Override
+ public String getAssignedContainerMgrAddress() {
+ return "localhost:9998";
+ }
};
}
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Mon Jun 13 08:56:23 2011
@@ -697,6 +697,11 @@ public class TestRuntimeEstimators {
public long getFinishTime() {
throw new UnsupportedOperationException("Not supported yet.");
}
+
+ @Override
+ public String getAssignedContainerMgrAddress() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
}
static class MockClock implements Clock {
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java Mon Jun 13 08:56:23 2011
@@ -78,6 +78,12 @@ public class CompletedTaskAttempt implem
}
@Override
+ public String getAssignedContainerMgrAddress() {
+ // TODO Container details needs to be part of some historyEvent to be able to render the log directory.
+ return null;
+ }
+
+ @Override
public String getNodeHttpAddress() {
return attemptInfo.getHostname() + ":" + attemptInfo.getHttpPort();
}