You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2011/06/13 10:56:24 UTC

svn commit: r1135062 - in /hadoop/common/branches/MR-279/mapreduce: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/ m...

Author: sharad
Date: Mon Jun 13 08:56:23 2011
New Revision: 1135062

URL: http://svn.apache.org/viewvc?rev=1135062&view=rev
Log:
Job level node blacklisting.

Added:
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerFailedEvent.java
Modified:
    hadoop/common/branches/MR-279/mapreduce/CHANGES.txt
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AMConstants.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocator.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java

Modified: hadoop/common/branches/MR-279/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/CHANGES.txt?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/CHANGES.txt (original)
+++ hadoop/common/branches/MR-279/mapreduce/CHANGES.txt Mon Jun 13 08:56:23 2011
@@ -4,6 +4,7 @@ Trunk (unreleased changes)
 
 
     MAPREDUCE-279
+    Job level node blacklisting. (sharad)
 
     Fixing the bug which was causing FAILED jobs to be displayed as COMPLETED
     on the RM UI. (vinodkv)

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AMConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AMConstants.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AMConstants.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AMConstants.java Mon Jun 13 08:56:23 2011
@@ -67,4 +67,7 @@ public interface AMConstants {
   public static final float DEFAULT_REDUCE_PREEMPTION_LIMIT = 0.5f;
   public static final String REDUCE_PREEMPTION_LIMIT = MRConstants.YARN_MR_PREFIX
   + "reduce.preemption.limit";
+
+  public static final String NODE_BLACKLISTING_ENABLE = MRConstants.YARN_MR_PREFIX
+  + "node.blacklisting.enable";
 }

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java Mon Jun 13 08:56:23 2011
@@ -46,6 +46,10 @@ public interface TaskAttempt {
    */
   ContainerId getAssignedContainerID();
 
+  /**If container Assigned then return container mgr address, otherwise null.
+   */
+  String getAssignedContainerMgrAddress();
+  
   /**If container Assigned then return the node's http address, otherwise null.
    */
   String getNodeHttpAddress();

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Mon Jun 13 08:56:23 2011
@@ -694,6 +694,16 @@ public abstract class TaskAttemptImpl im
   }
 
   @Override
+  public String getAssignedContainerMgrAddress() {
+    readLock.lock();
+    try {
+      return containerMgrAddress;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
   public long getLaunchTime() {
     readLock.lock();
     try {

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Mon Jun 13 08:56:23 2011
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@@ -745,6 +746,13 @@ public abstract class TaskImpl implement
     public TaskState transition(TaskImpl task, TaskEvent event) {
       task.failedAttempts++;
       TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
+      TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID());
+      if (attempt.getAssignedContainerMgrAddress() != null) {
+        //container was assigned
+        task.eventHandler.handle(new ContainerFailedEvent(attempt.getID(), 
+            attempt.getAssignedContainerMgrAddress()));
+      }
+      
       if (task.failedAttempts < task.maxAttempts) {
         task.handleTaskAttemptCompletion(
             ((TaskTAttemptEvent) event).getTaskAttemptID(), 

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocator.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocator.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAllocator.java Mon Jun 13 08:56:23 2011
@@ -25,7 +25,8 @@ public interface ContainerAllocator exte
   enum EventType {
 
     CONTAINER_REQ,
-    CONTAINER_DEALLOCATE
+    CONTAINER_DEALLOCATE,
+    CONTAINER_FAILED
   }
 
 }

Added: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerFailedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerFailedEvent.java?rev=1135062&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerFailedEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerFailedEvent.java Mon Jun 13 08:56:23 2011
@@ -0,0 +1,18 @@
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+public class ContainerFailedEvent extends ContainerAllocatorEvent {
+
+  private final String contMgrAddress;
+  
+  public ContainerFailedEvent(TaskAttemptId attemptID, String contMgrAddr) {
+    super(attemptID, ContainerAllocator.EventType.CONTAINER_FAILED);
+    this.contMgrAddress = contMgrAddr;
+  }
+
+  public String getContMgrAddress() {
+    return contMgrAddress;
+  }
+
+}

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Mon Jun 13 08:56:23 2011
@@ -232,9 +232,23 @@ public class RMContainerAllocator extend
         LOG.error("Could not deallocate container for task attemptId " + 
             aId);
       }
+    } else if (
+        event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) {
+      ContainerFailedEvent fEv = (ContainerFailedEvent) event;
+      String host = getHost(fEv.getContMgrAddress());
+      containerFailedOnHost(host);
     }
   }
 
+  private static String getHost(String contMgrAddress) {
+    String host = contMgrAddress;
+    String[] hostport = host.split(":");
+    if (hostport.length == 2) {
+      host = hostport[0];
+    }
+    return host;
+  }
+
   private void preemptReducesIfNeeded() {
     if (reduceResourceReqt == 0) {
       return; //no reduces
@@ -589,11 +603,7 @@ public class RMContainerAllocator extend
       ContainerRequest assigned = null;
       while (assigned == null && maps.size() > 0
           && allocated.getResource().getMemory() >= mapResourceReqt) {
-        String host = allocated.getContainerManagerAddress();
-        String[] hostport = host.split(":");
-        if (hostport.length == 2) {
-          host = hostport[0];
-        }
+        String host = getHost(allocated.getContainerManagerAddress());
         LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
         while (list != null && list.size() > 0) {
           LOG.info("Host matched to the request list " + host);

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Mon Jun 13 08:56:23 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -28,7 +29,10 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app.AMConstants;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -69,6 +73,11 @@ public abstract class RMContainerRequest
   private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>();
   private final Set<Container> release = new TreeSet<Container>(); 
 
+  private boolean nodeBlacklistingEnabled;
+  private int maxTaskFailuresPerNode;
+  private final Map<String, Integer> nodeFailures = new HashMap<String, Integer>();
+  private final Set<String> blacklistedNodes = new HashSet<String>();
+
   public RMContainerRequestor(ClientService clientService, AppContext context) {
     super(clientService, context);
   }
@@ -89,7 +98,18 @@ public abstract class RMContainerRequest
       this.priority = priority;
     }
   }
-  
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    nodeBlacklistingEnabled = 
+      conf.getBoolean(AMConstants.NODE_BLACKLISTING_ENABLE, true);
+    LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled);
+    maxTaskFailuresPerNode = 
+      conf.getInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3);
+    LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
+  }
+
   protected abstract void heartbeat() throws Exception;
 
   protected List<Container> makeRemoteRequest() throws YarnRemoteException {
@@ -118,6 +138,39 @@ public abstract class RMContainerRequest
     return allContainers;
   }
 
+  protected void containerFailedOnHost(String hostName) {
+    if (!nodeBlacklistingEnabled) {
+      return;
+    }
+    if (blacklistedNodes.contains(hostName)) {
+      LOG.info("Host " + hostName + " is already blacklisted.");
+      return; //already blacklisted
+    }
+    Integer failures = nodeFailures.remove(hostName);
+    failures = failures == null ? 0 : failures;
+    failures++;
+    LOG.info(failures + " failures on node " + hostName);
+    if (failures >= maxTaskFailuresPerNode) {
+      blacklistedNodes.add(hostName);
+      LOG.info("Blacklisted host " + hostName);
+      
+      //remove all the requests corresponding to this hostname
+      for (Map<String, Map<Resource, ResourceRequest>> remoteRequests 
+          : remoteRequestsTable.values()){
+        //remove from host
+        Map<Resource, ResourceRequest> reqMap = remoteRequests.remove(hostName);
+        if (reqMap != null) {
+          for (ResourceRequest req : reqMap.values()) {
+            ask.remove(req);
+          }
+        }
+        //TODO: remove from rack
+      }
+    } else {
+      nodeFailures.put(hostName, failures);
+    }
+  }
+
   protected Resource getAvailableResources() {
     return availableResources;
   }

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Mon Jun 13 08:56:23 2011
@@ -242,6 +242,11 @@ public class MockJobs extends MockApps {
       public List<String> getDiagnostics() {
         return diags;
       }
+
+      @Override
+      public String getAssignedContainerMgrAddress() {
+        return "localhost:9998";
+      }
     };
   }
 

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Mon Jun 13 08:56:23 2011
@@ -697,6 +697,11 @@ public class TestRuntimeEstimators {
     public long getFinishTime() {
       throw new UnsupportedOperationException("Not supported yet.");
     }
+
+    @Override
+    public String getAssignedContainerMgrAddress() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
   }
 
   static class MockClock implements Clock {

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java?rev=1135062&r1=1135061&r2=1135062&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java Mon Jun 13 08:56:23 2011
@@ -78,6 +78,12 @@ public class CompletedTaskAttempt implem
   }
 
   @Override
+  public String getAssignedContainerMgrAddress() {
+    // TODO Container details needs to be part of some historyEvent to be able to render the log directory.
+    return null;
+  }
+
+  @Override
   public String getNodeHttpAddress() {
     return attemptInfo.getHostname() + ":" + attemptInfo.getHttpPort();
   }