You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/19 23:42:53 UTC

git commit: TEZ-845. Handle un-blacklisting of nodes (bikas)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 816e2e5a9 -> 972dbc6fb


TEZ-845. Handle un-blacklisting of nodes (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/972dbc6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/972dbc6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/972dbc6f

Branch: refs/heads/master
Commit: 972dbc6fb2afb983b3e780ce19b3c76c24e67c7d
Parents: 816e2e5
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 19 14:42:10 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 19 14:42:23 2014 -0800

----------------------------------------------------------------------
 .../rm/AMSchedulerEventNodeBlacklistUpdate.java | 36 +++++++++++
 .../app/rm/AMSchedulerEventNodeBlacklisted.java | 35 -----------
 .../tez/dag/app/rm/AMSchedulerEventType.java    |  1 +
 .../apache/tez/dag/app/rm/TaskScheduler.java    |  7 +++
 .../dag/app/rm/TaskSchedulerEventHandler.java   | 25 +++++++-
 .../tez/dag/app/rm/TezAMRMClientAsync.java      |  5 ++
 .../apache/tez/dag/app/rm/node/AMNodeImpl.java  | 10 ++-
 .../apache/tez/dag/app/rm/node/AMNodeMap.java   |  4 +-
 .../tez/dag/app/rm/TestTaskScheduler.java       | 10 ++-
 .../tez/dag/app/rm/node/TestAMNodeMap.java      | 64 ++++++++++++++++++--
 .../org/apache/tez/mapreduce/TestMRRJobs.java   |  2 +-
 11 files changed, 152 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
new file mode 100644
index 0000000..ed7ebc3
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class AMSchedulerEventNodeBlacklistUpdate extends AMSchedulerEvent {
+
+  private final NodeId nodeId;
+
+  public AMSchedulerEventNodeBlacklistUpdate(NodeId nodeId, boolean add) {
+    super((add ? AMSchedulerEventType.S_NODE_BLACKLISTED
+        : AMSchedulerEventType.S_NODE_UNBLACKLISTED));
+    this.nodeId = nodeId;
+  }
+
+  public NodeId getNodeId() {
+    return this.nodeId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklisted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklisted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklisted.java
deleted file mode 100644
index 9cf3d65..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklisted.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-import org.apache.hadoop.yarn.api.records.NodeId;
-
-public class AMSchedulerEventNodeBlacklisted extends AMSchedulerEvent {
-
-  private final NodeId nodeId;
-
-  public AMSchedulerEventNodeBlacklisted(NodeId nodeId) {
-    super(AMSchedulerEventType.S_NODE_BLACKLISTED);
-    this.nodeId = nodeId;
-  }
-
-  public NodeId getNodeId() {
-    return this.nodeId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java
index 0554cff..8a4c371 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java
@@ -31,6 +31,7 @@ public enum AMSchedulerEventType {
 
   //Producer: Node
   S_NODE_BLACKLISTED,
+  S_NODE_UNBLACKLISTED,
   S_NODE_UNHEALTHY,
   S_NODE_HEALTHY,
   // The scheduler should have a way of knowing about unusable nodes. Acting on

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 814af6b..6ac9481 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -262,6 +262,7 @@ public class TaskScheduler extends AbstractService
   }
 
   public int getClusterNodeCount() {
+    // this can potentially be cheaper after YARN-1722
     return amRmClient.getClusterNodeCount();
   }
 
@@ -790,6 +791,12 @@ public class TaskScheduler extends AbstractService
     blacklistedNodes.add(nodeId);
   }
   
+  public synchronized void unblacklistNode(NodeId nodeId) {
+    if (blacklistedNodes.remove(nodeId)) {
+      amRmClient.removeNodeFromBlacklist(nodeId);
+    }
+  }
+  
   public synchronized void allocateTask(
       Object task,
       Resource capability,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 550b3a2..83816b8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -58,6 +58,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventTASucceeded;
 import org.apache.tez.dag.app.rm.container.AMContainerState;
 import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
 import org.apache.tez.dag.app.rm.node.AMNodeEventContainerAllocated;
+import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
 import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
 import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
 import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
@@ -79,6 +80,7 @@ public class TaskSchedulerEventHandler extends AbstractService
   protected volatile boolean isSignalled = false;
   final DAGClientServer clientService;
   private final ContainerSignatureMatcher containerSignatureMatcher;
+  private int cachedNodeCount = -1;
 
   BlockingQueue<AMSchedulerEvent> eventQueue
                               = new LinkedBlockingQueue<AMSchedulerEvent>();
@@ -137,14 +139,18 @@ public class TaskSchedulerEventHandler extends AbstractService
       break;
     case S_CONTAINER_COMPLETED:
       break;
+    case S_NODE_UNBLACKLISTED:
+      // fall through
     case S_NODE_BLACKLISTED:
-      handleNodeBlacklist((AMSchedulerEventNodeBlacklisted)sEvent);
+      handleNodeBlacklistUpdate((AMSchedulerEventNodeBlacklistUpdate)sEvent);
       break;
     case S_NODE_UNHEALTHY:
       break;
     case S_NODE_HEALTHY:
       // Consider changing this to work like BLACKLISTING.
       break;
+    default:
+      break;
     }
   }
 
@@ -171,8 +177,14 @@ public class TaskSchedulerEventHandler extends AbstractService
     eventHandler.handle(event);
   }
 
-  private void handleNodeBlacklist(AMSchedulerEventNodeBlacklisted event) {
-    taskScheduler.blacklistNode(event.getNodeId());
+  private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) {
+    if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
+      taskScheduler.blacklistNode(event.getNodeId());
+    } else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
+      taskScheduler.unblacklistNode(event.getNodeId());
+    } else {
+      throw new TezUncheckedException("Invalid event type: " + event.getType());
+    }
   }
 
   private void handleContainerDeallocate(
@@ -507,6 +519,13 @@ public class TaskSchedulerEventHandler extends AbstractService
   // complete and can hence lead to a deadlock if called from within a TSEH lock.
   @Override
   public float getProgress() {
+    // at this point allocate has been called and so node count must be available
+    // may change after YARN-1722
+    int nodeCount = taskScheduler.getClusterNodeCount();
+    if (nodeCount != cachedNodeCount) {
+      cachedNodeCount = nodeCount;
+      sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount));
+    }
     return dagAppMaster.getProgress();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
index e90ed0f..dad9473 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
@@ -86,6 +86,11 @@ public class TezAMRMClientAsync<T extends ContainerRequest> extends AMRMClientAs
   public synchronized void addNodeToBlacklist(NodeId nodeId) {
     client.updateBlacklist(Collections.singletonList(nodeId.getHost()), null);
   }
+  
+  //Remove after YARN-1723 is fixed
+   public synchronized void removeNodeFromBlacklist(NodeId nodeId) {
+     client.updateBlacklist(null, Collections.singletonList(nodeId.getHost()));
+   }
 
   @Override
   public synchronized void addContainerRequest(T req) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index 256eb1f..65c4b2b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklisted;
+import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklistUpdate;
 import org.apache.tez.dag.app.rm.container.AMContainerEvent;
 import org.apache.tez.dag.app.rm.container.AMContainerEventNodeFailed;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
@@ -248,9 +248,12 @@ public class AMNodeImpl implements AMNode {
     for (ContainerId c : containers) {
       sendEvent(new AMContainerEventNodeFailed(c, "Node failed"));
     }
+    // these containers are not useful anymore
+    pastContainers.addAll(containers);
+    containers.clear();
     sendEvent(new AMNodeEvent(getNodeId(),
         AMNodeEventType.N_NODE_WAS_BLACKLISTED));
-    sendEvent(new AMSchedulerEventNodeBlacklisted(getNodeId()));
+    sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true));
   }
 
   @SuppressWarnings("unchecked")
@@ -343,6 +346,9 @@ public class AMNodeImpl implements AMNode {
     @Override
     public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
       node.ignoreBlacklisting = ignore;
+      if (node.getState() == AMNodeState.BLACKLISTED) {
+        node.sendEvent(new AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
index 0336a9e..f815836 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
@@ -132,13 +132,15 @@ public class AMNodeMap extends AbstractService implements
     NodeId nodeId = rEvent.getNodeId();
     switch (rEvent.getType()) {
     case N_NODE_WAS_BLACKLISTED:
-   // When moving away from IGNORE_BLACKLISTING state, nodes will send out blacklisted events. These need to be ignored.
+      // When moving away from IGNORE_BLACKLISTING state, nodes will send out
+      // blacklisted events. These need to be ignored.
       addToBlackList(nodeId);
       computeIgnoreBlacklisting();
       break;
     case N_NODE_COUNT_UPDATED:
       AMNodeEventNodeCountUpdated event = (AMNodeEventNodeCountUpdated) rEvent;
       numClusterNodes = event.getNodeCount();
+      LOG.info("Num cluster nodes = " + numClusterNodes);
       computeIgnoreBlacklisting();
       break;
     case N_TURNED_UNHEALTHY:

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index a01412c..ff4036d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -437,7 +437,11 @@ public class TestTaskScheduler {
     verify(mockApp).containerBeingReleased(mockCId6);
     verify(mockRMClient).releaseAssignedContainer(mockCId6);
     verify(mockRMClient, times(5)).releaseAssignedContainer((ContainerId) any());
-    
+    // test unblacklist
+    scheduler.unblacklistNode(badNodeId);
+    verify(mockRMClient, times(1)).removeNodeFromBlacklist(badNodeId);
+    assertEquals(0, scheduler.blacklistedNodes.size());
+
     float progress = 0.5f;
     when(mockApp.getProgress()).thenReturn(progress);
     Assert.assertEquals(progress, scheduler.getProgress(), 0);
@@ -831,6 +835,10 @@ public class TestTaskScheduler {
     verify(mockApp).containerBeingReleased(mockCId6);
     verify(mockRMClient).releaseAssignedContainer(mockCId6);
     verify(mockRMClient, times(5)).releaseAssignedContainer((ContainerId) any());
+    // test unblacklist
+    scheduler.unblacklistNode(badNodeId);
+    verify(mockRMClient, times(1)).removeNodeFromBlacklist(badNodeId);
+    assertEquals(0, scheduler.blacklistedNodes.size());
 
     float progress = 0.5f;
     when(mockApp.getProgress()).thenReturn(progress);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
index b7ce891..a5b6e58 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeMap.java
@@ -34,9 +34,8 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklisted;
+import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklistUpdate;
 import org.apache.tez.dag.app.rm.AMSchedulerEventType;
-import org.apache.tez.dag.app.rm.container.AMContainerEvent;
 import org.apache.tez.dag.app.rm.container.AMContainerEventNodeFailed;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -119,11 +118,19 @@ public class TestAMNodeMap {
     conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
     TestEventHandler handler = new TestEventHandler();
     AMNodeMap amNodeMap = new AMNodeMap(handler, appContext);
+    dispatcher.register(AMNodeEventType.class, amNodeMap);
     amNodeMap.init(conf);
     amNodeMap.start();
 
+    amNodeMap.handle(new AMNodeEventNodeCountUpdated(4));
     NodeId nodeId = NodeId.newInstance("host1", 1234);
+    NodeId nodeId2 = NodeId.newInstance("host2", 1234);
+    NodeId nodeId3 = NodeId.newInstance("host3", 1234);
+    NodeId nodeId4 = NodeId.newInstance("host4", 1234);
     amNodeMap.nodeSeen(nodeId);
+    amNodeMap.nodeSeen(nodeId2);
+    amNodeMap.nodeSeen(nodeId3);
+    amNodeMap.nodeSeen(nodeId4);
     AMNodeImpl node = (AMNodeImpl) amNodeMap.get(nodeId);
     
     ContainerId cId1 = mock(ContainerId.class);
@@ -152,7 +159,8 @@ public class TestAMNodeMap {
     assertEquals(1, node.numFailedTAs);
     assertEquals(AMNodeState.ACTIVE, node.getState());
     
-    amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta3, true));
+    amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId3, ta3, true));
+    dispatcher.await();
     assertEquals(1, node.numSuccessfulTAs);
     assertEquals(2, node.numFailedTAs);
     assertEquals(AMNodeState.BLACKLISTED, node.getState());
@@ -167,7 +175,55 @@ public class TestAMNodeMap {
     assertEquals(AMNodeEventType.N_NODE_WAS_BLACKLISTED, handler.events.get(3).getType());
     assertEquals(node.getNodeId(), ((AMNodeEvent)handler.events.get(3)).getNodeId());
     assertEquals(AMSchedulerEventType.S_NODE_BLACKLISTED, handler.events.get(4).getType());
-    assertEquals(node.getNodeId(), ((AMSchedulerEventNodeBlacklisted)handler.events.get(4)).getNodeId());
+    assertEquals(node.getNodeId(), ((AMSchedulerEventNodeBlacklistUpdate)handler.events.get(4)).getNodeId());
+    
+    ContainerId cId4 = mock(ContainerId.class);
+    ContainerId cId5 = mock(ContainerId.class);
+    TezTaskAttemptID ta4 = mock(TezTaskAttemptID.class);
+    TezTaskAttemptID ta5 = mock(TezTaskAttemptID.class);
+    AMNodeImpl node2 = (AMNodeImpl) amNodeMap.get(nodeId2);
+    amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId2, cId4));
+    amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId2, cId5));
+    
+    amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId2, cId4, ta4, true));
+    assertEquals(1, node2.numFailedTAs);
+    assertEquals(AMNodeState.ACTIVE, node2.getState());
+    
+    handler.events.clear();
+    amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId2, cId5, ta5, true));
+    dispatcher.await();
+    assertEquals(2, node2.numFailedTAs);
+    assertEquals(AMNodeState.FORCED_ACTIVE, node2.getState());
+    AMNodeImpl node3 = (AMNodeImpl)amNodeMap.get(nodeId3);
+    assertEquals(AMNodeState.FORCED_ACTIVE, node3.getState());
+    assertEquals(10, handler.events.size());
+    assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(0).getType());
+    assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(1).getType());
+    assertEquals(AMNodeEventType.N_NODE_WAS_BLACKLISTED, handler.events.get(2).getType());
+    assertEquals(AMSchedulerEventType.S_NODE_BLACKLISTED, handler.events.get(3).getType());
+    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(4).getType());
+    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(5).getType());
+    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(6).getType());
+    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(7).getType());
+    assertEquals(AMSchedulerEventType.S_NODE_UNBLACKLISTED, handler.events.get(8).getType());
+    assertEquals(AMSchedulerEventType.S_NODE_UNBLACKLISTED, handler.events.get(9).getType());
+    
+    handler.events.clear();
+    amNodeMap.handle(new AMNodeEventNodeCountUpdated(8));
+    dispatcher.await();
+    assertEquals(AMNodeState.BLACKLISTED, node.getState());
+    assertEquals(AMNodeState.BLACKLISTED, node2.getState());
+    assertEquals(AMNodeState.ACTIVE, node3.getState());
+    assertEquals(8, handler.events.size());
+    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, handler.events.get(0).getType());
+    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, handler.events.get(1).getType());
+    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, handler.events.get(2).getType());
+    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, handler.events.get(3).getType());
+    assertEquals(AMNodeEventType.N_NODE_WAS_BLACKLISTED, handler.events.get(4).getType());
+    assertEquals(AMSchedulerEventType.S_NODE_BLACKLISTED, handler.events.get(5).getType());
+    assertEquals(AMNodeEventType.N_NODE_WAS_BLACKLISTED, handler.events.get(6).getType());
+    assertEquals(AMSchedulerEventType.S_NODE_BLACKLISTED, handler.events.get(7).getType());
+    
     amNodeMap.stop();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/972dbc6f/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
index b39457a..f98f392 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
@@ -80,7 +80,7 @@ public class TestMRRJobs {
     }
 
     if (mrrTezCluster == null) {
-      mrrTezCluster = new MiniTezCluster(TestMRRJobs.class.getName(), 3,
+      mrrTezCluster = new MiniTezCluster(TestMRRJobs.class.getName(), 1,
           1, 1);
       Configuration conf = new Configuration();
       conf.set("fs.defaultFS", remoteFs.getUri().toString());   // use HDFS