You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/15 03:52:50 UTC

git commit: TEZ-804. Handle node loss/bad nodes (bikas)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 720adc32d -> 206a1ed69


TEZ-804. Handle node loss/bad nodes (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/206a1ed6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/206a1ed6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/206a1ed6

Branch: refs/heads/master
Commit: 206a1ed69e386fa561387a09cdbcce40db14dbb1
Parents: 720adc3
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Feb 14 18:52:37 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Feb 14 18:52:37 2014 -0800

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  19 +-
 .../app/rm/AMSchedulerEventTALaunchRequest.java |   2 +-
 .../apache/tez/dag/app/rm/TaskScheduler.java    |  36 +++-
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  20 +-
 .../tez/dag/app/rm/TezAMRMClientAsync.java      |   6 +
 .../dag/app/rm/container/AMContainerImpl.java   |   4 +
 .../org/apache/tez/dag/app/rm/node/AMNode.java  |   1 +
 .../apache/tez/dag/app/rm/node/AMNodeImpl.java  |  41 +++-
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |  36 +++-
 .../tez/dag/app/rm/TestTaskScheduler.java       | 190 ++++++++++++++++++-
 .../tez/dag/app/rm/node/TestAMNodeMap.java      | 123 ++++++++++--
 .../org/apache/tez/mapreduce/TestMRRJobs.java   |   2 +-
 .../org/apache/tez/test/TestFaultTolerance.java |   1 +
 13 files changed, 445 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 81f5c5a..368406a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -70,6 +71,8 @@ import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
+import org.apache.tez.dag.app.rm.container.AMContainer;
+import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
@@ -1099,8 +1102,22 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       }
 
       TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
+      TezTaskAttemptID failedAttemptId = castEvent.getTaskAttemptID();
+      TaskAttempt failedAttempt = task.getAttempt(failedAttemptId);
+      ContainerId containerId = failedAttempt.getAssignedContainerID();
+      if (containerId != null) {
+        AMContainer amContainer = task.appContext.getAllContainers().
+            get(containerId);
+        if (amContainer != null) {
+          // inform the node about failure
+          task.eventHandler.handle(
+              new AMNodeEventTaskAttemptEnded(amContainer.getContainer().getNodeId(), 
+                  containerId, failedAttemptId, true));
+        }
+      }
+      
       if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
-          !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
+          !failedAttemptId.equals(task.successfulAttempt)) {
         // don't allow a different task attempt to override a previous
         // succeeded state
         return TaskStateInternal.SUCCEEDED;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
index bacb83e..14f303d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
@@ -26,7 +26,7 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
 
 public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
 
-  // TODO Get rid of remoteTask from here. Can be forgottent after it has been assigned.
+  // TODO Get rid of remoteTask from here. Can be forgotten after it has been assigned.
   //.... Maybe have the Container talk to the TaskAttempt to pull in the remote task.
 
   private final TezTaskAttemptID attemptId;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index c6bcfcb..52173f6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -45,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -64,6 +66,7 @@ import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /* TODO not yet updating cluster nodes on every allocate response
@@ -144,6 +147,8 @@ public class TaskScheduler extends AbstractService
   Map<ContainerId, HeldContainer> heldContainers =
       new HashMap<ContainerId, HeldContainer>();
   
+  Set<NodeId> blacklistedNodes = Sets.newConcurrentHashSet();
+  
   Resource totalResources = Resource.newInstance(0, 0);
   Resource allocatedResources = Resource.newInstance(0, 0);
   
@@ -777,6 +782,11 @@ public class TaskScheduler extends AbstractService
     return totalResources;
   }
 
+  public synchronized void blacklistNode(NodeId nodeId) {
+    amRmClient.addNodeToBlacklist(nodeId);
+    blacklistedNodes.add(nodeId);
+  }
+  
   public synchronized void allocateTask(
       Object task,
       Resource capability,
@@ -1238,7 +1248,31 @@ public class TaskScheduler extends AbstractService
     }
     for (Entry<CookieContainerRequest, Container> entry : assignedContainers
         .entrySet()) {
-      informAppAboutAssignment(entry.getKey(), entry.getValue());
+      Container container = entry.getValue();
+      // check for blacklisted nodes. There may be race conditions between
+      // setting blacklist and receiving allocations
+      if (blacklistedNodes.contains(container.getNodeId())) {
+        CookieContainerRequest request = entry.getKey();
+        Object task = getTask(request);
+        Object clientCookie = request.getCookie().getAppCookie();
+        LOG.info("Container: " + container.getId() + 
+            " allocated on blacklisted node: " + container.getNodeId() + 
+            " for task: " + task);
+        Object deAllocTask = deallocateContainer(container.getId());
+        assert deAllocTask.equals(task);
+        // its ok to submit the same request again because the RM will not give us
+        // the bad/unhealthy nodes again. The nodes may become healthy/unblacklisted
+        // and so its better to give the RM the full information.
+        allocateTask(task, request.getCapability(), 
+            (request.getNodes() == null ? null : 
+            request.getNodes().toArray(new String[request.getNodes().size()])), 
+            (request.getRacks() == null ? null : 
+              request.getRacks().toArray(new String[request.getRacks().size()])), 
+            request.getPriority(), 
+            request.getCookie().getContainerSignature(), clientCookie);
+      } else {
+        informAppAboutAssignment(entry.getKey(), container);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 5473e77..550b3a2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -49,6 +49,7 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
 import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
 import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
 import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchRequest;
@@ -135,7 +136,9 @@ public class TaskSchedulerEventHandler extends AbstractService
     case S_CONTAINERS_ALLOCATED:
       break;
     case S_CONTAINER_COMPLETED:
+      break;
     case S_NODE_BLACKLISTED:
+      handleNodeBlacklist((AMSchedulerEventNodeBlacklisted)sEvent);
       break;
     case S_NODE_UNHEALTHY:
       break;
@@ -168,6 +171,9 @@ public class TaskSchedulerEventHandler extends AbstractService
     eventHandler.handle(event);
   }
 
+  private void handleNodeBlacklist(AMSchedulerEventNodeBlacklisted event) {
+    taskScheduler.blacklistNode(event.getNodeId());
+  }
 
   private void handleContainerDeallocate(
                                   AMSchedulerEventDeallocateContainer event) {
@@ -325,8 +331,6 @@ public class TaskSchedulerEventHandler extends AbstractService
   
   @Override
   public synchronized void serviceStart() {
-    // FIXME hack alert how is this supposed to support multiple DAGs?
-    // Answer: this is shared across dags. need job==app-dag-master
     InetSocketAddress serviceAddr = clientService.getBindAddress();
     dagAppMaster = appContext.getAppMaster();
     taskScheduler = createTaskScheduler(serviceAddr.getHostName(),
@@ -395,7 +399,7 @@ public class TaskSchedulerEventHandler extends AbstractService
     // because the deallocateTask downcall may have raced with the
     // taskAllocated() upcall
     assert task.equals(taskAttempt);
-    
+ 
     if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
       sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(),
           event.getContainerContext()));
@@ -409,12 +413,18 @@ public class TaskSchedulerEventHandler extends AbstractService
   @Override
   public synchronized void containerCompleted(Object task, ContainerStatus containerStatus) {
     // Inform the Containers about completion.
-    sendEvent(new AMContainerEventCompleted(containerStatus));
+    AMContainer amContainer = appContext.getAllContainers().get(containerStatus.getContainerId());
+    if (amContainer != null) {
+      sendEvent(new AMContainerEventCompleted(containerStatus));
+    }
   }
 
   @Override
   public synchronized void containerBeingReleased(ContainerId containerId) {
-    sendEvent(new AMContainerEventStopRequest(containerId));
+    AMContainer amContainer = appContext.getAllContainers().get(containerId);
+    if (amContainer != null) {
+      sendEvent(new AMContainerEventStopRequest(containerId));
+    }
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
index 487c57a..e90ed0f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -80,6 +81,11 @@ public class TezAMRMClientAsync<T extends ContainerRequest> extends AMRMClientAs
     }
     return iter.next();
   }
+  
+  // Remove after YARN-1723 is fixed
+  public synchronized void addNodeToBlacklist(NodeId nodeId) {
+    client.updateBlacklist(Collections.singletonList(nodeId.getHost()), null);
+  }
 
   @Override
   public synchronized void addContainerRequest(T req) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 1ce0d89..d782be2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -717,6 +717,10 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
 
+      if (container.nodeFailed) {
+        // ignore duplicates
+        return;
+      }
       container.nodeFailed = true;
       String errorMessage = null;
       if (cEvent instanceof DiagnosableEvent) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
index 37f3f1b..1c34816 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java
@@ -32,4 +32,5 @@ public interface AMNode extends EventHandler<AMNodeEvent> {
 
   public boolean isUnhealthy();
   public boolean isBlacklisted();
+  public boolean isUsable();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index 9ee8a3c..256eb1f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app.rm.node;
 import java.util.EnumSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -41,6 +42,10 @@ import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklisted;
 import org.apache.tez.dag.app.rm.container.AMContainerEvent;
 import org.apache.tez.dag.app.rm.container.AMContainerEventNodeFailed;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
 
 public class AMNodeImpl implements AMNode {
 
@@ -51,16 +56,18 @@ public class AMNodeImpl implements AMNode {
   private final NodeId nodeId;
   private final AppContext appContext;
   private final int maxTaskFailuresPerNode;
-  private int numFailedTAs = 0;
-  private int numSuccessfulTAs = 0;
   private boolean blacklistingEnabled;
   private boolean ignoreBlacklisting = false;
+  private Set<TezTaskAttemptID> failedAttemptIds = Sets.newHashSet();
 
   @SuppressWarnings("rawtypes")
   protected EventHandler eventHandler;
 
-  private final List<ContainerId> containers = new LinkedList<ContainerId>();
-
+  @VisibleForTesting
+  final List<ContainerId> containers = new LinkedList<ContainerId>();
+  int numFailedTAs = 0;
+  int numSuccessfulTAs = 0;
+  
   //Book-keeping only. In case of Health status change.
   private final List<ContainerId> pastContainers = new LinkedList<ContainerId>();
 
@@ -174,6 +181,7 @@ public class AMNodeImpl implements AMNode {
     this.nodeId = nodeId;
     this.appContext = appContext;
     this.eventHandler = eventHandler;
+    this.blacklistingEnabled = blacklistingEnabled;
     this.maxTaskFailuresPerNode = maxTaskFailuresPerNode;
     this.stateMachine = stateMachineFactory.make(this);
     // TODO Handle the case where a node is created due to the RM reporting it's
@@ -237,6 +245,9 @@ public class AMNodeImpl implements AMNode {
   }
 
   protected void blacklistSelf() {
+    for (ContainerId c : containers) {
+      sendEvent(new AMContainerEventNodeFailed(c, "Node failed"));
+    }
     sendEvent(new AMNodeEvent(getNodeId(),
         AMNodeEventType.N_NODE_WAS_BLACKLISTED));
     sendEvent(new AMSchedulerEventNodeBlacklisted(getNodeId()));
@@ -274,11 +285,17 @@ public class AMNodeImpl implements AMNode {
     public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) {
       AMNodeEventTaskAttemptEnded event = (AMNodeEventTaskAttemptEnded) nEvent;
       if (event.failed()) {
-        node.numFailedTAs++;
-        boolean shouldBlacklist = node.shouldBlacklistNode();
-        if (shouldBlacklist) {
-          node.blacklistSelf();
-          return AMNodeState.BLACKLISTED;
+        // ignore duplicate attempt ids
+        if (node.failedAttemptIds.add(event.getTaskAttemptId())) {
+          // new failed container on node
+          node.numFailedTAs++;
+          boolean shouldBlacklist = node.shouldBlacklistNode();
+          if (shouldBlacklist) {
+            LOG.info("Too many task attempt failures. " + 
+                     "Blacklisting node: " + node.getNodeId());
+            node.blacklistSelf();
+            return AMNodeState.BLACKLISTED;
+          }
         }
       }
       return AMNodeState.ACTIVE;
@@ -409,6 +426,7 @@ public class AMNodeImpl implements AMNode {
     }
   }
 
+  @Override
   public boolean isBlacklisted() {
     this.readLock.lock();
     try {
@@ -417,4 +435,9 @@ public class AMNodeImpl implements AMNode {
       this.readLock.unlock();
     }
   }
+  
+  @Override
+  public boolean isUsable() {
+    return !(isUnhealthy() || isBlacklisted());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 451ba07..b18e2f6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -22,7 +22,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -35,7 +37,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -57,6 +62,10 @@ import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.rm.container.AMContainer;
+import org.apache.tez.dag.app.rm.node.AMNodeEvent;
+import org.apache.tez.dag.app.rm.node.AMNodeEventType;
+import org.apache.tez.dag.app.rm.node.AMNodeMap;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -92,6 +101,10 @@ public class TestTaskImpl {
   private String javaOpts;
   private boolean leafVertex;
   private ContainerContext containerContext;
+  private ContainerId mockContainerId;
+  private Container mockContainer;
+  private AMContainer mockAMContainer;
+  private NodeId mockNodeId;
 
   private MockTaskImpl mockTask;
   
@@ -117,7 +130,15 @@ public class TestTaskImpl {
     appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
     dagId = TezDAGID.getInstance(appId, 1);
     vertexId = TezVertexID.getInstance(dagId, 1);
-    appContext = mock(AppContext.class);
+    appContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+    mockContainerId = mock(ContainerId.class);
+    mockContainer = mock(Container.class);
+    mockAMContainer = mock(AMContainer.class);
+    mockNodeId = mock(NodeId.class);
+    when(mockContainer.getId()).thenReturn(mockContainerId);
+    when(mockContainer.getNodeId()).thenReturn(mockNodeId);
+    when(mockAMContainer.getContainer()).thenReturn(mockContainer);
+    when(appContext.getAllContainers().get(mockContainerId)).thenReturn(mockAMContainer);
     taskResource = Resource.newInstance(1024, 1);
     localResources = new HashMap<String, LocalResource>();
     environment = new HashMap<String, String>();
@@ -127,6 +148,7 @@ public class TestTaskImpl {
         environment, javaOpts);
     Vertex vertex = mock(Vertex.class);
     eventHandler = new TestEventHandler();
+    
     mockTask = new MockTaskImpl(vertexId, partition,
         eventHandler, conf, taskAttemptListener, clock,
         taskHeartbeatHandler, appContext, leafVertex, locationHint,
@@ -477,13 +499,16 @@ public class TestTaskImpl {
     // The task should now have succeeded
     assertTaskSucceededState();
 
+    eventHandler.events.clear();
     // Now fail the attempt after it has succeeded
     mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt()
         .getID(), TaskEventType.T_ATTEMPT_FAILED));
 
     // The task should still be in the scheduled state
     assertTaskScheduledState();
-    Event event = eventHandler.events.get(eventHandler.events.size()-1);
+    Event event = eventHandler.events.get(0);
+    Assert.assertEquals(AMNodeEventType.N_TA_ENDED, event.getType());
+    event = eventHandler.events.get(eventHandler.events.size()-1);
     Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType());
   }
 
@@ -494,7 +519,7 @@ public class TestTaskImpl {
 
     private List<MockTaskAttemptImpl> taskAttempts = new LinkedList<MockTaskAttemptImpl>();
     private Vertex vertex;
-
+    
     public MockTaskImpl(TezVertexID vertexId, int partition,
         EventHandler eventHandler, Configuration conf,
         TaskAttemptListener taskAttemptListener, Clock clock,
@@ -583,6 +608,11 @@ public class TestTaskImpl {
     public TaskAttemptState getStateNoLock() {
       return state;
     }
+    
+    @Override
+    public ContainerId getAssignedContainerID() {
+      return mockContainerId;
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index c8e8d1b..a01412c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -86,7 +86,7 @@ public class TestTaskScheduler {
       RecordFactoryProvider.getRecordFactory(null);
 
   @SuppressWarnings({ "unchecked" })
-  @Test
+  @Test(timeout=10000)
   public void testTaskSchedulerNoReuse() throws Exception {
     RackResolver.init(new YarnConfiguration());
     TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
@@ -346,8 +346,98 @@ public class TestTaskScheduler {
     // no other statuses returned
     verify(mockApp, times(3)).containerCompleted(any(), (ContainerStatus) any());
     verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId) any());
+    
+    // verify blacklisting
+    verify(mockRMClient, times(0)).addNodeToBlacklist((NodeId)any());
+    String badHost = "host6";
+    NodeId badNodeId = mock(NodeId.class);
+    when(badNodeId.getHost()).thenReturn(badHost);
+    scheduler.blacklistNode(badNodeId);
+    verify(mockRMClient, times(1)).addNodeToBlacklist(badNodeId);
+    Object mockTask4 = mock(Object.class);
+    Object mockCookie4 = mock(Object.class);
+    scheduler.allocateTask(mockTask4, mockCapability, null,
+        null, mockPriority, null, mockCookie4);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(5)).addContainerRequest(requestCaptor.capture());
+    CookieContainerRequest request4 = requestCaptor.getValue();
+    anyContainers.clear();
+    anyContainers.add(request4);
+    Container mockContainer5 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer5.getNodeId().getHost()).thenReturn(badHost);
+    when(mockContainer5.getNodeId()).thenReturn(badNodeId);
+    ContainerId mockCId5 = mock(ContainerId.class);
+    when(mockContainer5.getId()).thenReturn(mockCId5);
+    containers.clear();
+    containers.add(mockContainer5);
+    when(
+        mockRMClient.getMatchingRequests((Priority) any(),
+            eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return anyList;
+          }
 
+        }).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return emptyList;
+          }
 
+        });
+    scheduler.onContainersAllocated(containers);
+    drainableAppCallback.drain();
+    // no new allocation
+    verify(mockApp, times(3)).taskAllocated(any(), any(), (Container) any());
+    // verify blacklisted container released
+    verify(mockRMClient).releaseAssignedContainer(mockCId5);
+    verify(mockRMClient, times(4)).releaseAssignedContainer((ContainerId) any());
+    // verify request added back
+    verify(mockRMClient, times(6)).addContainerRequest(requestCaptor.capture());
+    CookieContainerRequest request5 = requestCaptor.getValue();
+    anyContainers.clear();
+    anyContainers.add(request5);
+    Container mockContainer6 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer6.getNodeId().getHost()).thenReturn("host7");
+    ContainerId mockCId6 = mock(ContainerId.class);
+    when(mockContainer6.getId()).thenReturn(mockCId6);
+    containers.clear();
+    containers.add(mockContainer6);
+    when(
+        mockRMClient.getMatchingRequests((Priority) any(),
+            eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return anyList;
+          }
+
+        }).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return emptyList;
+          }
+
+        });
+    scheduler.onContainersAllocated(containers);
+    drainableAppCallback.drain();
+    // new allocation
+    verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any());
+    verify(mockApp).taskAllocated(mockTask4, mockCookie4, mockContainer6);
+    // deallocate allocated task
+    assertTrue(scheduler.deallocateTask(mockTask4, true));
+    drainableAppCallback.drain();
+    verify(mockApp).containerBeingReleased(mockCId6);
+    verify(mockRMClient).releaseAssignedContainer(mockCId6);
+    verify(mockRMClient, times(5)).releaseAssignedContainer((ContainerId) any());
+    
     float progress = 0.5f;
     when(mockApp.getProgress()).thenReturn(progress);
     Assert.assertEquals(progress, scheduler.getProgress(), 0);
@@ -433,6 +523,7 @@ public class TestTaskScheduler {
     final Priority mockPriority1 = Priority.newInstance(1);
     final Priority mockPriority2 = Priority.newInstance(2);
     final Priority mockPriority3 = Priority.newInstance(3);
+    final Priority mockPriority4 = Priority.newInstance(3);
     Object mockTask2 = mock(Object.class);
     Object mockCookie2 = mock(Object.class);
     Object mockTask3 = mock(Object.class);
@@ -587,6 +678,9 @@ public class TestTaskScheduler {
             if (allocations == 2) {
               return mockPriority3;
             }
+            if (allocations == 3) {
+              return mockPriority4;
+            }
             return null;
           }
         });
@@ -643,6 +737,100 @@ public class TestTaskScheduler {
     verify(mockApp, times(3)).containerCompleted(any(), (ContainerStatus) any());
     verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId) any());
 
+    // verify blacklisting
+    verify(mockRMClient, times(0)).addNodeToBlacklist((NodeId)any());
+    String badHost = "host6";
+    NodeId badNodeId = mock(NodeId.class);
+    when(badNodeId.getHost()).thenReturn(badHost);
+    scheduler.blacklistNode(badNodeId);
+    verify(mockRMClient, times(1)).addNodeToBlacklist(badNodeId);
+    Object mockTask4 = mock(Object.class);
+    Object mockCookie4 = mock(Object.class);
+    scheduler.allocateTask(mockTask4, mockCapability, null,
+        null, mockPriority4, null, mockCookie4);
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(4)).addContainerRequest(requestCaptor.capture());
+    CookieContainerRequest request4 = requestCaptor.getValue();
+    anyContainers.clear();
+    anyContainers.add(request4);
+    Container mockContainer5 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer5.getNodeId().getHost()).thenReturn(badHost);
+    when(mockContainer5.getNodeId()).thenReturn(badNodeId);
+    ContainerId mockCId5 = mock(ContainerId.class);
+    when(mockContainer5.getId()).thenReturn(mockCId5);
+    containers.clear();
+    containers.add(mockContainer5);
+    when(
+        mockRMClient.getMatchingRequestsForTopPriority(
+            eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return anyList;
+          }
+
+        }).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return emptyList;
+          }
+
+        });
+    drainNotifier.set(false);
+    scheduler.onContainersAllocated(containers);
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
+    // no new allocation
+    verify(mockApp, times(3)).taskAllocated(any(), any(), (Container) any());
+    // verify blacklisted container released
+    verify(mockRMClient).releaseAssignedContainer(mockCId5);
+    verify(mockRMClient, times(4)).releaseAssignedContainer((ContainerId) any());
+    // verify request added back
+    verify(mockRMClient, times(5)).addContainerRequest(requestCaptor.capture());
+    CookieContainerRequest request5 = requestCaptor.getValue();
+    anyContainers.clear();
+    anyContainers.add(request5);
+    Container mockContainer6 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer6.getNodeId().getHost()).thenReturn("host7");
+    ContainerId mockCId6 = mock(ContainerId.class);
+    when(mockContainer6.getId()).thenReturn(mockCId6);
+    containers.clear();
+    containers.add(mockContainer6);
+    when(
+        mockRMClient.getMatchingRequestsForTopPriority(
+            eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return anyList;
+          }
+
+        }).thenAnswer(
+        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
+          @Override
+          public List<? extends Collection<CookieContainerRequest>> answer(
+              InvocationOnMock invocation) throws Throwable {
+            return emptyList;
+          }
+
+        });
+    drainNotifier.set(false);
+    scheduler.onContainersAllocated(containers);
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
+    // new allocation
+    verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any());
+    verify(mockApp).taskAllocated(mockTask4, mockCookie4, mockContainer6);
+    // deallocate allocated task
+    assertTrue(scheduler.deallocateTask(mockTask4, true));
+    drainableAppCallback.drain();
+    verify(mockApp).containerBeingReleased(mockCId6);
+    verify(mockRMClient).releaseAssignedContainer(mockCId6);
+    verify(mockRMClient, times(5)).releaseAssignedContainer((ContainerId) any());
 
     float progress = 0.5f;
     when(mockApp.getProgress()).thenReturn(progress);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
index 31304a9..b7ce891 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
@@ -21,30 +21,67 @@ package org.apache.tez.dag.app.rm.node;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklisted;
+import org.apache.tez.dag.app.rm.AMSchedulerEventType;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventNodeFailed;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
-public class TestAMNodeMap {
+import com.google.common.collect.Lists;
 
-  @Test
-  @SuppressWarnings({ "resource", "rawtypes" })
-  public void testHealthUpdateKnownNode() {
+@SuppressWarnings({ "resource", "rawtypes" })
+public class TestAMNodeMap {
 
-    DrainDispatcher dispatcher = new DrainDispatcher();
+  DrainDispatcher dispatcher;
+  EventHandler eventHandler;
+  
+  @Before
+  public void setup() {
+    dispatcher = new DrainDispatcher();
     dispatcher.init(new Configuration());
     dispatcher.start();
-    EventHandler eventHandler = dispatcher.getEventHandler();
-
+    eventHandler = dispatcher.getEventHandler();
+  }
+  
+  class TestEventHandler implements EventHandler{
+    List<Event> events = Lists.newLinkedList();
+    @SuppressWarnings("unchecked")
+    @Override
+    public void handle(Event event) {
+      events.add(event);
+      eventHandler.handle(event);
+    }
+  }
+  
+  @After
+  public void teardown() {
+    dispatcher.stop();
+  }
+  
+  @Test(timeout=5000)
+  public void testHealthUpdateKnownNode() {
     AppContext appContext = mock(AppContext.class);
 
     AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
+    amNodeMap.init(new Configuration(false));
+    amNodeMap.start();
 
     NodeId nodeId = NodeId.newInstance("host1", 2342);
     amNodeMap.nodeSeen(nodeId);
@@ -53,27 +90,85 @@ public class TestAMNodeMap {
     amNodeMap.handle(new AMNodeEventStateChanged(nodeReport));
     dispatcher.await();
     assertEquals(AMNodeState.UNHEALTHY, amNodeMap.get(nodeId).getState());
-    dispatcher.stop();
+    amNodeMap.stop();
   }
 
-  @Test
-  @SuppressWarnings({ "resource", "rawtypes" })
+  @Test(timeout=5000)
   public void testHealthUpdateUnknownNode() {
-    DrainDispatcher dispatcher = new DrainDispatcher();
-    EventHandler eventHandler = dispatcher.getEventHandler();
-
     AppContext appContext = mock(AppContext.class);
 
     AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
+    amNodeMap.init(new Configuration(false));
+    amNodeMap.start();
 
     NodeId nodeId = NodeId.newInstance("unknownhost", 2342);
 
     NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
     amNodeMap.handle(new AMNodeEventStateChanged(nodeReport));
+    dispatcher.await();
 
+    amNodeMap.stop();
     // No exceptions - the status update was ignored. Not bothering to capture
     // the log message for verification.
-    dispatcher.stop();
+  }
+  
+  @Test(timeout=5000)
+  public void testNodeSelfBlacklist() {
+    AppContext appContext = mock(AppContext.class);
+    Configuration conf = new Configuration(false);
+    conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
+    TestEventHandler handler = new TestEventHandler();
+    AMNodeMap amNodeMap = new AMNodeMap(handler, appContext);
+    amNodeMap.init(conf);
+    amNodeMap.start();
+
+    NodeId nodeId = NodeId.newInstance("host1", 1234);
+    amNodeMap.nodeSeen(nodeId);
+    AMNodeImpl node = (AMNodeImpl) amNodeMap.get(nodeId);
+    
+    ContainerId cId1 = mock(ContainerId.class);
+    ContainerId cId2 = mock(ContainerId.class);
+    ContainerId cId3 = mock(ContainerId.class);
+    
+    amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId, cId1));
+    amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId, cId2));
+    amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId, cId3));
+    assertEquals(3, node.containers.size());
+    
+    TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class);
+    TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class);
+    TezTaskAttemptID ta3 = mock(TezTaskAttemptID.class);
+    
+    amNodeMap.handle(new AMNodeEventTaskAttemptSucceeded(nodeId, cId1, ta1));
+    assertEquals(1, node.numSuccessfulTAs);
+    
+    amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true));
+    assertEquals(1, node.numSuccessfulTAs);
+    assertEquals(1, node.numFailedTAs);
+    assertEquals(AMNodeState.ACTIVE, node.getState());
+    // duplicate should not affect anything
+    amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true));
+    assertEquals(1, node.numSuccessfulTAs);
+    assertEquals(1, node.numFailedTAs);
+    assertEquals(AMNodeState.ACTIVE, node.getState());
+    
+    amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta3, true));
+    assertEquals(1, node.numSuccessfulTAs);
+    assertEquals(2, node.numFailedTAs);
+    assertEquals(AMNodeState.BLACKLISTED, node.getState());
+    
+    assertEquals(5, handler.events.size());
+    assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(0).getType());
+    assertEquals(cId1, ((AMContainerEventNodeFailed)handler.events.get(0)).getContainerId());
+    assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(1).getType());
+    assertEquals(cId2, ((AMContainerEventNodeFailed)handler.events.get(1)).getContainerId());
+    assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(2).getType());
+    assertEquals(cId3, ((AMContainerEventNodeFailed)handler.events.get(2)).getContainerId());
+    assertEquals(AMNodeEventType.N_NODE_WAS_BLACKLISTED, handler.events.get(3).getType());
+    assertEquals(node.getNodeId(), ((AMNodeEvent)handler.events.get(3)).getNodeId());
+    assertEquals(AMSchedulerEventType.S_NODE_BLACKLISTED, handler.events.get(4).getType());
+    assertEquals(node.getNodeId(), ((AMSchedulerEventNodeBlacklisted)handler.events.get(4)).getNodeId());
+    amNodeMap.stop();
   }
 
   private static NodeReport generateNodeReport(NodeId nodeId, NodeState nodeState) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
index f98f392..b39457a 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
@@ -80,7 +80,7 @@ public class TestMRRJobs {
     }
 
     if (mrrTezCluster == null) {
-      mrrTezCluster = new MiniTezCluster(TestMRRJobs.class.getName(), 1,
+      mrrTezCluster = new MiniTezCluster(TestMRRJobs.class.getName(), 3,
           1, 1);
       Configuration conf = new Configuration();
       conf.set("fs.defaultFS", remoteFs.getUri().toString());   // use HDFS

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/206a1ed6/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index 9569f6e..db259b1 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -91,6 +91,7 @@ public class TestFaultTolerance {
       TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
       tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
           remoteStagingDir.toString());
+      tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
 
       AMConfiguration amConfig = new AMConfiguration(
           new HashMap<String, String>(), new HashMap<String, LocalResource>(),