You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/23 05:41:00 UTC

[26/50] [abbrv] hadoop git commit: YARN-3212. RMNode State Transition Update with DECOMMISSIONING state. (Junping Du via wangda)

YARN-3212. RMNode State Transition Update with DECOMMISSIONING state. (Junping Du via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9bc913a3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9bc913a3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9bc913a3

Branch: refs/heads/HDFS-7285
Commit: 9bc913a35c46e65d373c3ae3f01a377e16e8d0ca
Parents: 3f42753
Author: Wangda Tan <wa...@apache.org>
Authored: Fri Sep 18 10:04:17 2015 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Fri Sep 18 10:04:17 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../resourcemanager/NodesListManager.java       |   2 +-
 .../resourcemanager/ResourceTrackerService.java |  20 +-
 .../resourcemanager/rmnode/RMNodeEventType.java |   2 +-
 .../resourcemanager/rmnode/RMNodeImpl.java      | 456 +++++++++++++------
 .../resourcemanager/TestRMNodeTransitions.java  | 196 +++++++-
 .../resourcemanager/webapp/TestNodesPage.java   |   3 +-
 7 files changed, 513 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bc913a3/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8fe686d..32bf7dc 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -197,6 +197,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4034. Render cluster Max Priority in scheduler metrics in RM web
     UI. (Rohith Sharma K S via jianhe)
 
+    YARN-3212. RMNode State Transition Update with DECOMMISSIONING state.
+    (Junping Du via wangda)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bc913a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index abea85e..1e8b98a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -399,7 +399,7 @@ public class NodesListManager extends CompositeService implements
       NodeId nodeId = entry.getKey();
       if (!isValidNode(nodeId.getHost())) {
         this.rmContext.getDispatcher().getEventHandler().handle(
-            new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION_WITH_TIMEOUT));
+            new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION));
       } else {
         // Recommissioning the nodes
         if (entry.getValue().getState() == NodeState.DECOMMISSIONING

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bc913a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 100e991..7e774c5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -399,8 +400,10 @@ public class ResourceTrackerService extends AbstractService implements
 
     NodeId nodeId = remoteNodeStatus.getNodeId();
 
-    // 1. Check if it's a valid (i.e. not excluded) node
-    if (!this.nodesListManager.isValidNode(nodeId.getHost())) {
+    // 1. Check if it's a valid (i.e. not excluded) node, if not, see if it is
+    // in decommissioning.
+    if (!this.nodesListManager.isValidNode(nodeId.getHost())
+        && !isNodeInDecommissioning(nodeId)) {
       String message =
           "Disallowed NodeManager nodeId: " + nodeId + " hostname: "
               + nodeId.getHost();
@@ -486,6 +489,19 @@ public class ResourceTrackerService extends AbstractService implements
     return nodeHeartBeatResponse;
   }
 
+  /**
+   * Check if node in decommissioning state.
+   * @param nodeId
+   */
+  private boolean isNodeInDecommissioning(NodeId nodeId) {
+    RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
+    if (rmNode != null &&
+        rmNode.getState().equals(NodeState.DECOMMISSIONING)) {
+      return true;
+    }
+    return false;
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public UnRegisterNodeManagerResponse unRegisterNodeManager(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bc913a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
index 27ba1c0..ad36036 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
@@ -24,7 +24,7 @@ public enum RMNodeEventType {
   
   // Source: AdminService
   DECOMMISSION,
-  DECOMMISSION_WITH_TIMEOUT,
+  GRACEFUL_DECOMMISSION,
   RECOMMISSION,
   
   // Source: AdminService, ResourceTrackerService

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bc913a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 7a1ba74..391b6ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -144,101 +144,150 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
                                            RMNodeEventType,
                                            RMNodeEvent>(NodeState.NEW)
 
-     //Transitions from NEW state
-     .addTransition(NodeState.NEW, NodeState.RUNNING, 
-         RMNodeEventType.STARTED, new AddNodeTransition())
-     .addTransition(NodeState.NEW, NodeState.NEW,
-         RMNodeEventType.RESOURCE_UPDATE, 
-         new UpdateNodeResourceWhenUnusableTransition())
-
-     //Transitions from RUNNING state
-     .addTransition(NodeState.RUNNING,
-         EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
-         RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
-     .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED,
-         RMNodeEventType.DECOMMISSION,
-         new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
-     .addTransition(NodeState.RUNNING, NodeState.LOST,
-         RMNodeEventType.EXPIRE,
-         new DeactivateNodeTransition(NodeState.LOST))
-     .addTransition(NodeState.RUNNING, NodeState.REBOOTED,
-         RMNodeEventType.REBOOTING,
-         new DeactivateNodeTransition(NodeState.REBOOTED))
-     .addTransition(NodeState.RUNNING, NodeState.RUNNING,
-         RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
-     .addTransition(NodeState.RUNNING, NodeState.RUNNING,
-         RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
-     .addTransition(NodeState.RUNNING, NodeState.RUNNING,
-         RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
-         new AddContainersToBeRemovedFromNMTransition())
-     .addTransition(NodeState.RUNNING, NodeState.RUNNING,
-         RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
-     .addTransition(NodeState.RUNNING, NodeState.RUNNING,
-         RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
-     .addTransition(NodeState.RUNNING, NodeState.SHUTDOWN,
-         RMNodeEventType.SHUTDOWN,
-         new DeactivateNodeTransition(NodeState.SHUTDOWN))
-
-     //Transitions from REBOOTED state
-     .addTransition(NodeState.REBOOTED, NodeState.REBOOTED,
-         RMNodeEventType.RESOURCE_UPDATE,
-         new UpdateNodeResourceWhenUnusableTransition())
-         
-     //Transitions from DECOMMISSIONED state
-     .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
-         RMNodeEventType.RESOURCE_UPDATE,
-         new UpdateNodeResourceWhenUnusableTransition())
-     .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
-         RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
-         new AddContainersToBeRemovedFromNMTransition())
-
-     //Transitions from LOST state
-     .addTransition(NodeState.LOST, NodeState.LOST,
-         RMNodeEventType.RESOURCE_UPDATE,
-         new UpdateNodeResourceWhenUnusableTransition())
-     .addTransition(NodeState.LOST, NodeState.LOST,
-         RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
-         new AddContainersToBeRemovedFromNMTransition())
-
-     //Transitions from UNHEALTHY state
-     .addTransition(NodeState.UNHEALTHY,
-         EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING),
-         RMNodeEventType.STATUS_UPDATE,
-         new StatusUpdateWhenUnHealthyTransition())
-     .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED,
-         RMNodeEventType.DECOMMISSION,
-         new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
-     .addTransition(NodeState.UNHEALTHY, NodeState.LOST,
-         RMNodeEventType.EXPIRE,
-         new DeactivateNodeTransition(NodeState.LOST))
-     .addTransition(NodeState.UNHEALTHY, NodeState.REBOOTED,
-         RMNodeEventType.REBOOTING,
-         new DeactivateNodeTransition(NodeState.REBOOTED))
-     .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
-         RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
-     .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
-         RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
-     .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
-         RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
-     .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
-         RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition())
-     .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
-         RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
-         new AddContainersToBeRemovedFromNMTransition())
-     .addTransition(NodeState.UNHEALTHY, NodeState.SHUTDOWN,
-         RMNodeEventType.SHUTDOWN,
-         new DeactivateNodeTransition(NodeState.SHUTDOWN))
-
-     //Transitions from SHUTDOWN state
-     .addTransition(NodeState.SHUTDOWN, NodeState.SHUTDOWN,
-         RMNodeEventType.RESOURCE_UPDATE,
-         new UpdateNodeResourceWhenUnusableTransition())
-     .addTransition(NodeState.SHUTDOWN, NodeState.SHUTDOWN,
-         RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
-         new AddContainersToBeRemovedFromNMTransition())
-
-     // create the topology tables
-     .installTopology(); 
+      //Transitions from NEW state
+      .addTransition(NodeState.NEW, NodeState.RUNNING,
+          RMNodeEventType.STARTED, new AddNodeTransition())
+      .addTransition(NodeState.NEW, NodeState.NEW,
+          RMNodeEventType.RESOURCE_UPDATE,
+          new UpdateNodeResourceWhenUnusableTransition())
+
+      //Transitions from RUNNING state
+      .addTransition(NodeState.RUNNING,
+          EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
+          RMNodeEventType.STATUS_UPDATE,
+          new StatusUpdateWhenHealthyTransition())
+      .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED,
+          RMNodeEventType.DECOMMISSION,
+          new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
+      .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONING,
+          RMNodeEventType.GRACEFUL_DECOMMISSION,
+          new DecommissioningNodeTransition(NodeState.RUNNING,
+              NodeState.DECOMMISSIONING))
+      .addTransition(NodeState.RUNNING, NodeState.LOST,
+          RMNodeEventType.EXPIRE,
+          new DeactivateNodeTransition(NodeState.LOST))
+      .addTransition(NodeState.RUNNING, NodeState.REBOOTED,
+          RMNodeEventType.REBOOTING,
+          new DeactivateNodeTransition(NodeState.REBOOTED))
+      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
+          RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
+      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
+          RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
+      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
+          RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+          new AddContainersToBeRemovedFromNMTransition())
+      .addTransition(NodeState.RUNNING, EnumSet.of(NodeState.RUNNING),
+          RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
+      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
+          RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
+      .addTransition(NodeState.RUNNING, NodeState.SHUTDOWN,
+          RMNodeEventType.SHUTDOWN,
+          new DeactivateNodeTransition(NodeState.SHUTDOWN))
+
+      //Transitions from REBOOTED state
+      .addTransition(NodeState.REBOOTED, NodeState.REBOOTED,
+          RMNodeEventType.RESOURCE_UPDATE,
+          new UpdateNodeResourceWhenUnusableTransition())
+
+      //Transitions from DECOMMISSIONED state
+      .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
+          RMNodeEventType.RESOURCE_UPDATE,
+          new UpdateNodeResourceWhenUnusableTransition())
+      .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
+          RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+          new AddContainersToBeRemovedFromNMTransition())
+
+       //Transitions from DECOMMISSIONING state
+      .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED,
+          RMNodeEventType.DECOMMISSION,
+          new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
+      .addTransition(NodeState.DECOMMISSIONING, NodeState.RUNNING,
+          RMNodeEventType.RECOMMISSION,
+          new RecommissionNodeTransition(NodeState.RUNNING))
+      .addTransition(NodeState.DECOMMISSIONING,
+          EnumSet.of(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED),
+          RMNodeEventType.STATUS_UPDATE,
+          new StatusUpdateWhenHealthyTransition())
+      .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
+          RMNodeEventType.GRACEFUL_DECOMMISSION,
+          new DecommissioningNodeTransition(NodeState.DECOMMISSIONING,
+              NodeState.DECOMMISSIONING))
+      .addTransition(NodeState.DECOMMISSIONING, NodeState.LOST,
+          RMNodeEventType.EXPIRE,
+          new DeactivateNodeTransition(NodeState.LOST))
+      .addTransition(NodeState.DECOMMISSIONING, NodeState.REBOOTED,
+          RMNodeEventType.REBOOTING,
+          new DeactivateNodeTransition(NodeState.REBOOTED))
+
+      .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
+          RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
+
+      // TODO (in YARN-3223) update resource when container finished.
+      .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
+          RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
+      // TODO (in YARN-3223) update resource when container finished.
+      .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
+          RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+          new AddContainersToBeRemovedFromNMTransition())
+      .addTransition(NodeState.DECOMMISSIONING, EnumSet.of(
+          NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED),
+          RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
+      .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
+          RMNodeEventType.RESOURCE_UPDATE,
+          new UpdateNodeResourceWhenRunningTransition())
+
+      //Transitions from LOST state
+      .addTransition(NodeState.LOST, NodeState.LOST,
+          RMNodeEventType.RESOURCE_UPDATE,
+          new UpdateNodeResourceWhenUnusableTransition())
+      .addTransition(NodeState.LOST, NodeState.LOST,
+          RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+          new AddContainersToBeRemovedFromNMTransition())
+
+      //Transitions from UNHEALTHY state
+      .addTransition(NodeState.UNHEALTHY,
+          EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING),
+          RMNodeEventType.STATUS_UPDATE,
+          new StatusUpdateWhenUnHealthyTransition())
+      .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED,
+          RMNodeEventType.DECOMMISSION,
+          new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
+      .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONING,
+          RMNodeEventType.GRACEFUL_DECOMMISSION,
+          new DecommissioningNodeTransition(NodeState.UNHEALTHY,
+              NodeState.DECOMMISSIONING))
+      .addTransition(NodeState.UNHEALTHY, NodeState.LOST,
+          RMNodeEventType.EXPIRE,
+          new DeactivateNodeTransition(NodeState.LOST))
+      .addTransition(NodeState.UNHEALTHY, NodeState.REBOOTED,
+          RMNodeEventType.REBOOTING,
+          new DeactivateNodeTransition(NodeState.REBOOTED))
+      .addTransition(NodeState.UNHEALTHY, EnumSet.of(NodeState.UNHEALTHY),
+          RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
+      .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
+          RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
+      .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
+          RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
+      .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
+          RMNodeEventType.RESOURCE_UPDATE,
+          new UpdateNodeResourceWhenUnusableTransition())
+      .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
+          RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+          new AddContainersToBeRemovedFromNMTransition())
+      .addTransition(NodeState.UNHEALTHY, NodeState.SHUTDOWN,
+          RMNodeEventType.SHUTDOWN,
+          new DeactivateNodeTransition(NodeState.SHUTDOWN))
+
+      //Transitions from SHUTDOWN state
+      .addTransition(NodeState.SHUTDOWN, NodeState.SHUTDOWN,
+          RMNodeEventType.RESOURCE_UPDATE,
+          new UpdateNodeResourceWhenUnusableTransition())
+      .addTransition(NodeState.SHUTDOWN, NodeState.SHUTDOWN,
+          RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+          new AddContainersToBeRemovedFromNMTransition())
+
+      // create the topology tables
+      .installTopology();
 
   private final StateMachine<NodeState, RMNodeEventType,
                              RMNodeEvent> stateMachine;
@@ -265,7 +314,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     this.writeLock = lock.writeLock();
 
     this.stateMachine = stateMachineFactory.make(this);
-    
+
     this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();
 
     this.containerAllocationExpirer = context.getContainerAllocationExpirer();
@@ -291,6 +340,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     return httpPort;
   }
 
+  // Test only
+  public void setHttpPort(int port) {
+    this.httpPort = port;
+  }
+
   @Override
   public NodeId getNodeID() {
     return this.nodeId;
@@ -497,23 +551,35 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       metrics.decrNumShutdownNMs();
       break;
     default:
-      LOG.debug("Unexpected previous node state");    
+      LOG.debug("Unexpected previous node state");
     }
   }
 
+  // Treats nodes in decommissioning as active nodes
+  // TODO we may want to differentiate active nodes and decommissioning node in
+  // metrics later.
+  private void updateMetricsForGracefulDecommissionOnUnhealthyNode() {
+    ClusterMetrics metrics = ClusterMetrics.getMetrics();
+    metrics.incrNumActiveNodes();
+    metrics.decrNumUnhealthyNMs();
+  }
+
   private void updateMetricsForDeactivatedNode(NodeState initialState,
                                                NodeState finalState) {
     ClusterMetrics metrics = ClusterMetrics.getMetrics();
 
     switch (initialState) {
-      case RUNNING:
-        metrics.decrNumActiveNodes();
-        break;
-      case UNHEALTHY:
-        metrics.decrNumUnhealthyNMs();
-        break;
-      default:
-        LOG.debug("Unexpected inital state");
+    case RUNNING:
+      metrics.decrNumActiveNodes();
+      break;
+    case DECOMMISSIONING:
+      metrics.decrNumActiveNodes();
+      break;
+    case UNHEALTHY:
+      metrics.decrNumUnhealthyNMs();
+      break;
+    default:
+      LOG.debug("Unexpected inital state");
     }
 
     switch (finalState) {
@@ -608,10 +674,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   }
 
   public static class ReconnectNodeTransition implements
-      SingleArcTransition<RMNodeImpl, RMNodeEvent> {
+      MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> {
 
     @Override
-    public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+    public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
       RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
       RMNode newNode = reconnectEvent.getReconnectedNode();
       rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
@@ -622,6 +688,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       // No application running on the node, so send node-removal event with 
       // cleaning up old container info.
       if (noRunningApps) {
+        if (rmNode.getState() == NodeState.DECOMMISSIONING) {
+          // When node in decommissioning, and no running apps on this node,
+          // it will return as decommissioned state.
+          deactivateNode(rmNode, NodeState.DECOMMISSIONED);
+          return NodeState.DECOMMISSIONED;
+        }
         rmNode.nodeUpdateQueue.clear();
         rmNode.context.getDispatcher().getEventHandler().handle(
             new NodeRemovedSchedulerEvent(rmNode));
@@ -652,6 +724,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
             rmNode.context.getDispatcher().getEventHandler().handle(
                 new RMNodeStartedEvent(newNode.getNodeID(), null, null));
         }
+
       } else {
         rmNode.httpPort = newNode.getHttpPort();
         rmNode.httpAddress = newNode.getHttpAddress();
@@ -678,17 +751,21 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
                   new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption
                       .newInstance(newNode.getTotalCapability(), -1)));
         }
+
       }
+      return rmNode.getState();
     }
 
     private void handleNMContainerStatus(
         List<NMContainerStatus> nmContainerStatuses, RMNodeImpl rmnode) {
-      List<ContainerStatus> containerStatuses =
-          new ArrayList<ContainerStatus>();
-      for (NMContainerStatus nmContainerStatus : nmContainerStatuses) {
-        containerStatuses.add(createContainerStatus(nmContainerStatus));
+      if (nmContainerStatuses != null) {
+        List<ContainerStatus> containerStatuses =
+            new ArrayList<ContainerStatus>();
+        for (NMContainerStatus nmContainerStatus : nmContainerStatuses) {
+          containerStatuses.add(createContainerStatus(nmContainerStatus));
+        }
+        rmnode.handleContainerStatus(containerStatuses);
       }
-      rmnode.handleContainerStatus(containerStatuses);
     }
 
     private ContainerStatus createContainerStatus(
@@ -770,31 +847,94 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
-      // Inform the scheduler
-      rmNode.nodeUpdateQueue.clear();
-      // If the current state is NodeState.UNHEALTHY
-      // Then node is already been removed from the
-      // Scheduler
-      NodeState initialState = rmNode.getState();
-      if (!initialState.equals(NodeState.UNHEALTHY)) {
-        rmNode.context.getDispatcher().getEventHandler()
-          .handle(new NodeRemovedSchedulerEvent(rmNode));
+      RMNodeImpl.deactivateNode(rmNode, finalState);
+    }
+  }
+
+  /**
+   * Put a node in deactivated (decommissioned) status.
+   * @param rmNode
+   * @param finalState
+   */
+  public static void deactivateNode(RMNodeImpl rmNode, NodeState finalState) {
+
+    reportNodeUnusable(rmNode, finalState);
+
+    // Deactivate the node
+    rmNode.context.getRMNodes().remove(rmNode.nodeId);
+    LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+        + finalState);
+    rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
+  }
+
+  /**
+   * Report node is UNUSABLE and update metrics.
+   * @param rmNode
+   * @param finalState
+   */
+  public static void reportNodeUnusable(RMNodeImpl rmNode,
+      NodeState finalState) {
+    // Inform the scheduler
+    rmNode.nodeUpdateQueue.clear();
+    // If the current state is NodeState.UNHEALTHY
+    // Then node is already been removed from the
+    // Scheduler
+    NodeState initialState = rmNode.getState();
+    if (!initialState.equals(NodeState.UNHEALTHY)) {
+      rmNode.context.getDispatcher().getEventHandler()
+        .handle(new NodeRemovedSchedulerEvent(rmNode));
+    }
+    rmNode.context.getDispatcher().getEventHandler().handle(
+        new NodesListManagerEvent(
+            NodesListManagerEventType.NODE_UNUSABLE, rmNode));
+
+    //Update the metrics
+    rmNode.updateMetricsForDeactivatedNode(initialState, finalState);
+  }
+
+  /**
+   * The transition to put node in decommissioning state.
+   */
+  public static class DecommissioningNodeTransition
+      implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
+    private final NodeState initState;
+    private final NodeState finalState;
+
+    public DecommissioningNodeTransition(NodeState initState,
+        NodeState finalState) {
+      this.initState = initState;
+      this.finalState = finalState;
+    }
+
+    @Override
+    public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+      LOG.info("Put Node " + rmNode.nodeId + " in DECOMMISSIONING.");
+      if (initState.equals(NodeState.UNHEALTHY)) {
+        rmNode.updateMetricsForGracefulDecommissionOnUnhealthyNode();
       }
-      rmNode.context.getDispatcher().getEventHandler().handle(
-          new NodesListManagerEvent(
-              NodesListManagerEventType.NODE_UNUSABLE, rmNode));
+      // TODO (in YARN-3223) Keep NM's available resource to be 0
+    }
+  }
+
+  public static class RecommissionNodeTransition
+      implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 
-      // Deactivate the node
-      rmNode.context.getRMNodes().remove(rmNode.nodeId);
-      LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
-          + finalState);
-      rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
+    private final NodeState finalState;
+    public RecommissionNodeTransition(NodeState finalState) {
+      this.finalState = finalState;
+    }
 
-      //Update the metrics
-      rmNode.updateMetricsForDeactivatedNode(initialState, finalState);
+    @Override
+    public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+      LOG.info("Node " + rmNode.nodeId + " in DECOMMISSIONING is " +
+          "recommissioned back to RUNNING.");
+      // TODO handle NM resource resume in YARN-3223.
     }
   }
 
+  /**
+   * Status update transition when node is healthy.
+   */
   public static class StatusUpdateWhenHealthyTransition implements
       MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> {
     @Override
@@ -805,25 +945,44 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       // Switch the last heartbeatresponse.
       rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
 
-      NodeHealthStatus remoteNodeHealthStatus = 
+      NodeHealthStatus remoteNodeHealthStatus =
           statusEvent.getNodeHealthStatus();
       rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
       rmNode.setLastHealthReportTime(
           remoteNodeHealthStatus.getLastHealthReportTime());
+      NodeState initialState = rmNode.getState();
+      boolean isNodeDecommissioning =
+          initialState.equals(NodeState.DECOMMISSIONING);
       if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
-        LOG.info("Node " + rmNode.nodeId + " reported UNHEALTHY with details: "
-            + remoteNodeHealthStatus.getHealthReport());
-        rmNode.nodeUpdateQueue.clear();
-        // Inform the scheduler
-        rmNode.context.getDispatcher().getEventHandler().handle(
-            new NodeRemovedSchedulerEvent(rmNode));
-        rmNode.context.getDispatcher().getEventHandler().handle(
-            new NodesListManagerEvent(
-                NodesListManagerEventType.NODE_UNUSABLE, rmNode));
-        // Update metrics
-        rmNode.updateMetricsForDeactivatedNode(rmNode.getState(),
-            NodeState.UNHEALTHY);
-        return NodeState.UNHEALTHY;
+        LOG.info("Node " + rmNode.nodeId +
+            " reported UNHEALTHY with details: " +
+            remoteNodeHealthStatus.getHealthReport());
+        // if a node in decommissioning receives an unhealthy report,
+        // it will keep decommissioning.
+        if (isNodeDecommissioning) {
+          return NodeState.DECOMMISSIONING;
+        } else {
+          reportNodeUnusable(rmNode, NodeState.UNHEALTHY);
+          return NodeState.UNHEALTHY;
+        }
+      }
+      if (isNodeDecommissioning) {
+        List<ApplicationId> runningApps = rmNode.getRunningApps();
+
+        List<ApplicationId> keepAliveApps = statusEvent.getKeepAliveAppIds();
+
+        // no running (and keeping alive) app on this node, get it
+        // decommissioned.
+        // TODO may need to check no container is being scheduled on this node
+        // as well.
+        if ((runningApps == null || runningApps.size() == 0)
+            && (keepAliveApps == null || keepAliveApps.size() == 0)) {
+          RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED);
+          return NodeState.DECOMMISSIONED;
+        }
+
+        // TODO (in YARN-3223) if node in decommissioning, get node resource
+        // updated if container get finished (keep available resource to be 0)
       }
 
       rmNode.handleContainerStatus(statusEvent.getContainers());
@@ -848,7 +1007,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
           statusEvent.getKeepAliveAppIds());
       }
 
-      return NodeState.RUNNING;
+      return initialState;
     }
   }
 
@@ -857,11 +1016,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
     @Override
     public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
-      RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
+      RMNodeStatusEvent statusEvent = (RMNodeStatusEvent)event;
 
       // Switch the last heartbeatresponse.
       rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
-      NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus();
+      NodeHealthStatus remoteNodeHealthStatus =
+          statusEvent.getNodeHealthStatus();
       rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
       rmNode.setLastHealthReportTime(
           remoteNodeHealthStatus.getLastHealthReportTime());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bc913a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 61c6166..a6e1575 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -29,7 +29,9 @@ import static org.mockito.Mockito.when;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Random;
 
+import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -75,7 +77,7 @@ import org.mockito.stubbing.Answer;
 public class TestRMNodeTransitions {
 
   RMNodeImpl node;
-  
+
   private RMContext rmContext;
   private YarnScheduler scheduler;
 
@@ -168,6 +170,42 @@ public class TestRMNodeTransitions {
     return event;
   }
   
+  private RMNodeStatusEvent getMockRMNodeStatusEventWithRunningApps() {
+    NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
+
+    NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
+    Boolean yes = new Boolean(true);
+    doReturn(yes).when(healthStatus).getIsNodeHealthy();
+
+    RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
+    doReturn(healthStatus).when(event).getNodeHealthStatus();
+    doReturn(response).when(event).getLatestResponse();
+    doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
+    doReturn(getAppIdList()).when(event).getKeepAliveAppIds();
+    return event;
+  }
+
+  private List<ApplicationId> getAppIdList() {
+    List<ApplicationId> appIdList = new ArrayList<ApplicationId>();
+    appIdList.add(BuilderUtils.newApplicationId(0, 0));
+    return appIdList;
+  }
+
+  private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() {
+    NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
+
+    NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
+    Boolean yes = new Boolean(true);
+    doReturn(yes).when(healthStatus).getIsNodeHealthy();
+
+    RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
+    doReturn(healthStatus).when(event).getNodeHealthStatus();
+    doReturn(response).when(event).getLatestResponse();
+    doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
+    doReturn(null).when(event).getKeepAliveAppIds();
+    return event;
+  }
+
   @Test (timeout = 5000)
   public void testExpiredContainer() {
     // Start the node
@@ -195,7 +233,33 @@ public class TestRMNodeTransitions {
      */
     verify(scheduler,times(2)).handle(any(NodeUpdateSchedulerEvent.class));     
   }
-  
+
+  @Test
+  public void testStatusUpdateOnDecommissioningNode(){
+    RMNodeImpl node = getDecommissioningNode();
+    Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+    // Verify node in DECOMMISSIONING won't be changed by status update
+    // with running apps
+    RMNodeStatusEvent statusEvent = getMockRMNodeStatusEventWithRunningApps();
+    node.handle(statusEvent);
+    Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+
+    // Verify node in DECOMMISSIONING will be changed by status update
+    // without running apps
+    statusEvent = getMockRMNodeStatusEventWithoutRunningApps();
+    node.handle(statusEvent);
+    Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
+  }
+
+  @Test
+  public void testRecommissionNode(){
+    RMNodeImpl node = getDecommissioningNode();
+    Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+    node.handle(new RMNodeEvent(node.getNodeID(),
+        RMNodeEventType.RECOMMISSION));
+    Assert.assertEquals(NodeState.RUNNING, node.getState());
+  }
+
   @Test (timeout = 5000)
   public void testContainerUpdate() throws InterruptedException{
     //Start the node
@@ -253,9 +317,9 @@ public class TestRMNodeTransitions {
     Assert.assertEquals(completedContainerIdFromNode2_1,completedContainers.get(0)
         .getContainerId()); 
     Assert.assertEquals(completedContainerIdFromNode2_2,completedContainers.get(1)
-        .getContainerId());   
+        .getContainerId());
   }
-  
+
   @Test (timeout = 5000)
   public void testStatusChange(){
     //Start the node
@@ -292,7 +356,7 @@ public class TestRMNodeTransitions {
     node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE));
     Assert.assertEquals(0, node.getQueueSize());
   }
-  
+
   @Test
   public void testRunningExpire() {
     RMNodeImpl node = getRunningNode();
@@ -375,7 +439,7 @@ public class TestRMNodeTransitions {
         initialRebooted, cm.getNumRebootedNMs());
     Assert.assertEquals(NodeState.LOST, node.getState());
   }
-  
+
   @Test
   public void testUnhealthyExpireForSchedulerRemove() {
     RMNodeImpl node = getUnhealthyNode();
@@ -408,6 +472,28 @@ public class TestRMNodeTransitions {
   }
 
   @Test
+  public void testDecommissionOnDecommissioningNode() {
+    RMNodeImpl node = getDecommissioningNode();
+    ClusterMetrics cm = ClusterMetrics.getMetrics();
+    int initialActive = cm.getNumActiveNMs();
+    int initialLost = cm.getNumLostNMs();
+    int initialUnhealthy = cm.getUnhealthyNMs();
+    int initialDecommissioned = cm.getNumDecommisionedNMs();
+    int initialRebooted = cm.getNumRebootedNMs();
+    node.handle(new RMNodeEvent(node.getNodeID(),
+        RMNodeEventType.DECOMMISSION));
+    Assert.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
+    Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
+    Assert.assertEquals("Unhealthy Nodes",
+        initialUnhealthy, cm.getUnhealthyNMs());
+    Assert.assertEquals("Decommissioned Nodes",
+        initialDecommissioned + 1, cm.getNumDecommisionedNMs());
+    Assert.assertEquals("Rebooted Nodes",
+        initialRebooted, cm.getNumRebootedNMs());
+    Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
+  }
+
+  @Test
   public void testUnhealthyDecommission() {
     RMNodeImpl node = getUnhealthyNode();
     ClusterMetrics cm = ClusterMetrics.getMetrics();
@@ -429,6 +515,30 @@ public class TestRMNodeTransitions {
     Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
   }
 
+  // Test Decommissioning on a unhealthy node will make it decommissioning.
+  @Test
+  public void testUnhealthyDecommissioning() {
+    RMNodeImpl node = getUnhealthyNode();
+    ClusterMetrics cm = ClusterMetrics.getMetrics();
+    int initialActive = cm.getNumActiveNMs();
+    int initialLost = cm.getNumLostNMs();
+    int initialUnhealthy = cm.getUnhealthyNMs();
+    int initialDecommissioned = cm.getNumDecommisionedNMs();
+    int initialRebooted = cm.getNumRebootedNMs();
+    node.handle(new RMNodeEvent(node.getNodeID(),
+        RMNodeEventType.GRACEFUL_DECOMMISSION));
+    Assert.assertEquals("Active Nodes", initialActive + 1,
+        cm.getNumActiveNMs());
+    Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
+    Assert.assertEquals("Unhealthy Nodes",
+        initialUnhealthy - 1, cm.getUnhealthyNMs());
+    Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
+        cm.getNumDecommisionedNMs());
+    Assert.assertEquals("Rebooted Nodes",
+        initialRebooted, cm.getNumRebootedNMs());
+    Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+  }
+
   @Test
   public void testRunningRebooting() {
     RMNodeImpl node = getRunningNode();
@@ -567,6 +677,14 @@ public class TestRMNodeTransitions {
     return node;
   }
 
+  private RMNodeImpl getDecommissioningNode() {
+    RMNodeImpl node = getRunningNode();
+    node.handle(new RMNodeEvent(node.getNodeID(),
+        RMNodeEventType.GRACEFUL_DECOMMISSION));
+    Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+    return node;
+  }
+
   private RMNodeImpl getUnhealthyNode() {
     RMNodeImpl node = getRunningNode();
     NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
@@ -577,20 +695,19 @@ public class TestRMNodeTransitions {
     return node;
   }
 
-
   private RMNodeImpl getNewNode() {
     NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
     RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
     return node;
   }
-  
+
   private RMNodeImpl getNewNode(Resource capability) {
     NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
     RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, 
         capability, null);
     return node;
   }
-  
+
   private RMNodeImpl getRebootedNode() {
     NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
     Resource capability = Resource.newInstance(4096, 4);
@@ -650,7 +767,39 @@ public class TestRMNodeTransitions {
     Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
         nodesListManagerEvent.getType());
   }
-  
+
+  @Test
+  public void testReconnectOnDecommissioningNode() {
+    RMNodeImpl node = getDecommissioningNode();
+
+    // Reconnect event with running app
+    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node,
+        getAppIdList(), null));
+    // still decommissioning
+    Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+
+    // Reconnect event without any running app
+    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null));
+    Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
+  }
+
+  @Test
+  public void testReconnectWithNewPortOnDecommissioningNode() {
+    RMNodeImpl node = getDecommissioningNode();
+    Random r= new Random();
+    node.setHttpPort(r.nextInt(10000));
+    // Reconnect event with running app
+    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node,
+        getAppIdList(), null));
+    // still decommissioning
+    Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+
+    node.setHttpPort(r.nextInt(10000));
+    // Reconnect event without any running app
+    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null));
+    Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
+  }
+
   @Test
   public void testResourceUpdateOnRunningNode() {
     RMNodeImpl node = getRunningNode();
@@ -658,18 +807,23 @@ public class TestRMNodeTransitions {
     assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
     assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
     node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
-        ResourceOption.newInstance(Resource.newInstance(2048, 2), 
+        ResourceOption.newInstance(Resource.newInstance(2048, 2),
             ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
     Resource newCapacity = node.getTotalCapability();
     assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
     assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
-    
+
     Assert.assertEquals(NodeState.RUNNING, node.getState());
     Assert.assertNotNull(nodesListManagerEvent);
     Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
         nodesListManagerEvent.getType());
   }
-  
+
+  @Test
+  public void testDecommissioningOnRunningNode(){
+    getDecommissioningNode();
+  }
+
   @Test
   public void testResourceUpdateOnNewNode() {
     RMNodeImpl node = getNewNode(Resource.newInstance(4096, 4));
@@ -682,10 +836,10 @@ public class TestRMNodeTransitions {
     Resource newCapacity = node.getTotalCapability();
     assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
     assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
-    
+
     Assert.assertEquals(NodeState.NEW, node.getState());
   }
-  
+
   @Test
   public void testResourceUpdateOnRebootedNode() {
     RMNodeImpl node = getRebootedNode();
@@ -702,6 +856,18 @@ public class TestRMNodeTransitions {
     Assert.assertEquals(NodeState.REBOOTED, node.getState());
   }
 
+  // Test unhealthy report on a decommissioning node will make it
+  // keep decommissioning.
+  @Test
+  public void testDecommissioningUnhealthy() {
+    RMNodeImpl node = getDecommissioningNode();
+    NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
+        System.currentTimeMillis());
+    node.handle(new RMNodeStatusEvent(node.getNodeID(), status,
+        new ArrayList<ContainerStatus>(), null, null));
+    Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+  }
+
   @Test
   public void testReconnnectUpdate() {
     final String nmVersion1 = "nm version 1";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bc913a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java
index b70fdc1..458b240 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java
@@ -43,8 +43,7 @@ public class TestNodesPage {
   final int numberOfNodesPerRack = 8;
   // The following is because of the way TestRMWebApp.mockRMContext creates
   // nodes.
-  final int numberOfLostNodesPerRack = numberOfNodesPerRack
-      / NodeState.values().length;
+  final int numberOfLostNodesPerRack = 1;
 
   // Number of Actual Table Headers for NodesPage.NodesBlock might change in
   // future. In that case this value should be adjusted to the new value.