You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/23 05:41:00 UTC
[26/50] [abbrv] hadoop git commit: YARN-3212. RMNode State Transition
Update with DECOMMISSIONING state. (Junping Du via wangda)
YARN-3212. RMNode State Transition Update with DECOMMISSIONING state. (Junping Du via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9bc913a3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9bc913a3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9bc913a3
Branch: refs/heads/HDFS-7285
Commit: 9bc913a35c46e65d373c3ae3f01a377e16e8d0ca
Parents: 3f42753
Author: Wangda Tan <wa...@apache.org>
Authored: Fri Sep 18 10:04:17 2015 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Fri Sep 18 10:04:17 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../resourcemanager/NodesListManager.java | 2 +-
.../resourcemanager/ResourceTrackerService.java | 20 +-
.../resourcemanager/rmnode/RMNodeEventType.java | 2 +-
.../resourcemanager/rmnode/RMNodeImpl.java | 456 +++++++++++++------
.../resourcemanager/TestRMNodeTransitions.java | 196 +++++++-
.../resourcemanager/webapp/TestNodesPage.java | 3 +-
7 files changed, 513 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bc913a3/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8fe686d..32bf7dc 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -197,6 +197,9 @@ Release 2.8.0 - UNRELEASED
YARN-4034. Render cluster Max Priority in scheduler metrics in RM web
UI. (Rohith Sharma K S via jianhe)
+ YARN-3212. RMNode State Transition Update with DECOMMISSIONING state.
+ (Junping Du via wangda)
+
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bc913a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index abea85e..1e8b98a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -399,7 +399,7 @@ public class NodesListManager extends CompositeService implements
NodeId nodeId = entry.getKey();
if (!isValidNode(nodeId.getHost())) {
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION_WITH_TIMEOUT));
+ new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION));
} else {
// Recommissioning the nodes
if (entry.getValue().getState() == NodeState.DECOMMISSIONING
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bc913a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 100e991..7e774c5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -399,8 +400,10 @@ public class ResourceTrackerService extends AbstractService implements
NodeId nodeId = remoteNodeStatus.getNodeId();
- // 1. Check if it's a valid (i.e. not excluded) node
- if (!this.nodesListManager.isValidNode(nodeId.getHost())) {
+ // 1. Check if it's a valid (i.e. not excluded) node, if not, see if it is
+ // in decommissioning.
+ if (!this.nodesListManager.isValidNode(nodeId.getHost())
+ && !isNodeInDecommissioning(nodeId)) {
String message =
"Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ nodeId.getHost();
@@ -486,6 +489,19 @@ public class ResourceTrackerService extends AbstractService implements
return nodeHeartBeatResponse;
}
+ /**
+ * Check if node in decommissioning state.
+ * @param nodeId
+ */
+ private boolean isNodeInDecommissioning(NodeId nodeId) {
+ RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
+ if (rmNode != null &&
+ rmNode.getState().equals(NodeState.DECOMMISSIONING)) {
+ return true;
+ }
+ return false;
+ }
+
@SuppressWarnings("unchecked")
@Override
public UnRegisterNodeManagerResponse unRegisterNodeManager(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bc913a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
index 27ba1c0..ad36036 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
@@ -24,7 +24,7 @@ public enum RMNodeEventType {
// Source: AdminService
DECOMMISSION,
- DECOMMISSION_WITH_TIMEOUT,
+ GRACEFUL_DECOMMISSION,
RECOMMISSION,
// Source: AdminService, ResourceTrackerService
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bc913a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 7a1ba74..391b6ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -144,101 +144,150 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
RMNodeEventType,
RMNodeEvent>(NodeState.NEW)
- //Transitions from NEW state
- .addTransition(NodeState.NEW, NodeState.RUNNING,
- RMNodeEventType.STARTED, new AddNodeTransition())
- .addTransition(NodeState.NEW, NodeState.NEW,
- RMNodeEventType.RESOURCE_UPDATE,
- new UpdateNodeResourceWhenUnusableTransition())
-
- //Transitions from RUNNING state
- .addTransition(NodeState.RUNNING,
- EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
- RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
- .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED,
- RMNodeEventType.DECOMMISSION,
- new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
- .addTransition(NodeState.RUNNING, NodeState.LOST,
- RMNodeEventType.EXPIRE,
- new DeactivateNodeTransition(NodeState.LOST))
- .addTransition(NodeState.RUNNING, NodeState.REBOOTED,
- RMNodeEventType.REBOOTING,
- new DeactivateNodeTransition(NodeState.REBOOTED))
- .addTransition(NodeState.RUNNING, NodeState.RUNNING,
- RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
- .addTransition(NodeState.RUNNING, NodeState.RUNNING,
- RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
- .addTransition(NodeState.RUNNING, NodeState.RUNNING,
- RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
- new AddContainersToBeRemovedFromNMTransition())
- .addTransition(NodeState.RUNNING, NodeState.RUNNING,
- RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
- .addTransition(NodeState.RUNNING, NodeState.RUNNING,
- RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
- .addTransition(NodeState.RUNNING, NodeState.SHUTDOWN,
- RMNodeEventType.SHUTDOWN,
- new DeactivateNodeTransition(NodeState.SHUTDOWN))
-
- //Transitions from REBOOTED state
- .addTransition(NodeState.REBOOTED, NodeState.REBOOTED,
- RMNodeEventType.RESOURCE_UPDATE,
- new UpdateNodeResourceWhenUnusableTransition())
-
- //Transitions from DECOMMISSIONED state
- .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
- RMNodeEventType.RESOURCE_UPDATE,
- new UpdateNodeResourceWhenUnusableTransition())
- .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
- RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
- new AddContainersToBeRemovedFromNMTransition())
-
- //Transitions from LOST state
- .addTransition(NodeState.LOST, NodeState.LOST,
- RMNodeEventType.RESOURCE_UPDATE,
- new UpdateNodeResourceWhenUnusableTransition())
- .addTransition(NodeState.LOST, NodeState.LOST,
- RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
- new AddContainersToBeRemovedFromNMTransition())
-
- //Transitions from UNHEALTHY state
- .addTransition(NodeState.UNHEALTHY,
- EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING),
- RMNodeEventType.STATUS_UPDATE,
- new StatusUpdateWhenUnHealthyTransition())
- .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED,
- RMNodeEventType.DECOMMISSION,
- new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
- .addTransition(NodeState.UNHEALTHY, NodeState.LOST,
- RMNodeEventType.EXPIRE,
- new DeactivateNodeTransition(NodeState.LOST))
- .addTransition(NodeState.UNHEALTHY, NodeState.REBOOTED,
- RMNodeEventType.REBOOTING,
- new DeactivateNodeTransition(NodeState.REBOOTED))
- .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
- RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
- .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
- RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
- .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
- RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
- .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
- RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition())
- .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
- RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
- new AddContainersToBeRemovedFromNMTransition())
- .addTransition(NodeState.UNHEALTHY, NodeState.SHUTDOWN,
- RMNodeEventType.SHUTDOWN,
- new DeactivateNodeTransition(NodeState.SHUTDOWN))
-
- //Transitions from SHUTDOWN state
- .addTransition(NodeState.SHUTDOWN, NodeState.SHUTDOWN,
- RMNodeEventType.RESOURCE_UPDATE,
- new UpdateNodeResourceWhenUnusableTransition())
- .addTransition(NodeState.SHUTDOWN, NodeState.SHUTDOWN,
- RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
- new AddContainersToBeRemovedFromNMTransition())
-
- // create the topology tables
- .installTopology();
+ //Transitions from NEW state
+ .addTransition(NodeState.NEW, NodeState.RUNNING,
+ RMNodeEventType.STARTED, new AddNodeTransition())
+ .addTransition(NodeState.NEW, NodeState.NEW,
+ RMNodeEventType.RESOURCE_UPDATE,
+ new UpdateNodeResourceWhenUnusableTransition())
+
+ //Transitions from RUNNING state
+ .addTransition(NodeState.RUNNING,
+ EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
+ RMNodeEventType.STATUS_UPDATE,
+ new StatusUpdateWhenHealthyTransition())
+ .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED,
+ RMNodeEventType.DECOMMISSION,
+ new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
+ .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONING,
+ RMNodeEventType.GRACEFUL_DECOMMISSION,
+ new DecommissioningNodeTransition(NodeState.RUNNING,
+ NodeState.DECOMMISSIONING))
+ .addTransition(NodeState.RUNNING, NodeState.LOST,
+ RMNodeEventType.EXPIRE,
+ new DeactivateNodeTransition(NodeState.LOST))
+ .addTransition(NodeState.RUNNING, NodeState.REBOOTED,
+ RMNodeEventType.REBOOTING,
+ new DeactivateNodeTransition(NodeState.REBOOTED))
+ .addTransition(NodeState.RUNNING, NodeState.RUNNING,
+ RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
+ .addTransition(NodeState.RUNNING, NodeState.RUNNING,
+ RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
+ .addTransition(NodeState.RUNNING, NodeState.RUNNING,
+ RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+ new AddContainersToBeRemovedFromNMTransition())
+ .addTransition(NodeState.RUNNING, EnumSet.of(NodeState.RUNNING),
+ RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
+ .addTransition(NodeState.RUNNING, NodeState.RUNNING,
+ RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
+ .addTransition(NodeState.RUNNING, NodeState.SHUTDOWN,
+ RMNodeEventType.SHUTDOWN,
+ new DeactivateNodeTransition(NodeState.SHUTDOWN))
+
+ //Transitions from REBOOTED state
+ .addTransition(NodeState.REBOOTED, NodeState.REBOOTED,
+ RMNodeEventType.RESOURCE_UPDATE,
+ new UpdateNodeResourceWhenUnusableTransition())
+
+ //Transitions from DECOMMISSIONED state
+ .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
+ RMNodeEventType.RESOURCE_UPDATE,
+ new UpdateNodeResourceWhenUnusableTransition())
+ .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
+ RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+ new AddContainersToBeRemovedFromNMTransition())
+
+ //Transitions from DECOMMISSIONING state
+ .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED,
+ RMNodeEventType.DECOMMISSION,
+ new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
+ .addTransition(NodeState.DECOMMISSIONING, NodeState.RUNNING,
+ RMNodeEventType.RECOMMISSION,
+ new RecommissionNodeTransition(NodeState.RUNNING))
+ .addTransition(NodeState.DECOMMISSIONING,
+ EnumSet.of(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED),
+ RMNodeEventType.STATUS_UPDATE,
+ new StatusUpdateWhenHealthyTransition())
+ .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
+ RMNodeEventType.GRACEFUL_DECOMMISSION,
+ new DecommissioningNodeTransition(NodeState.DECOMMISSIONING,
+ NodeState.DECOMMISSIONING))
+ .addTransition(NodeState.DECOMMISSIONING, NodeState.LOST,
+ RMNodeEventType.EXPIRE,
+ new DeactivateNodeTransition(NodeState.LOST))
+ .addTransition(NodeState.DECOMMISSIONING, NodeState.REBOOTED,
+ RMNodeEventType.REBOOTING,
+ new DeactivateNodeTransition(NodeState.REBOOTED))
+
+ .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
+ RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
+
+ // TODO (in YARN-3223) update resource when container finished.
+ .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
+ RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
+ // TODO (in YARN-3223) update resource when container finished.
+ .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
+ RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+ new AddContainersToBeRemovedFromNMTransition())
+ .addTransition(NodeState.DECOMMISSIONING, EnumSet.of(
+ NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED),
+ RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
+ .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
+ RMNodeEventType.RESOURCE_UPDATE,
+ new UpdateNodeResourceWhenRunningTransition())
+
+ //Transitions from LOST state
+ .addTransition(NodeState.LOST, NodeState.LOST,
+ RMNodeEventType.RESOURCE_UPDATE,
+ new UpdateNodeResourceWhenUnusableTransition())
+ .addTransition(NodeState.LOST, NodeState.LOST,
+ RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+ new AddContainersToBeRemovedFromNMTransition())
+
+ //Transitions from UNHEALTHY state
+ .addTransition(NodeState.UNHEALTHY,
+ EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING),
+ RMNodeEventType.STATUS_UPDATE,
+ new StatusUpdateWhenUnHealthyTransition())
+ .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED,
+ RMNodeEventType.DECOMMISSION,
+ new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
+ .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONING,
+ RMNodeEventType.GRACEFUL_DECOMMISSION,
+ new DecommissioningNodeTransition(NodeState.UNHEALTHY,
+ NodeState.DECOMMISSIONING))
+ .addTransition(NodeState.UNHEALTHY, NodeState.LOST,
+ RMNodeEventType.EXPIRE,
+ new DeactivateNodeTransition(NodeState.LOST))
+ .addTransition(NodeState.UNHEALTHY, NodeState.REBOOTED,
+ RMNodeEventType.REBOOTING,
+ new DeactivateNodeTransition(NodeState.REBOOTED))
+ .addTransition(NodeState.UNHEALTHY, EnumSet.of(NodeState.UNHEALTHY),
+ RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
+ .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
+ RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
+ .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
+ RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
+ .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
+ RMNodeEventType.RESOURCE_UPDATE,
+ new UpdateNodeResourceWhenUnusableTransition())
+ .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
+ RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+ new AddContainersToBeRemovedFromNMTransition())
+ .addTransition(NodeState.UNHEALTHY, NodeState.SHUTDOWN,
+ RMNodeEventType.SHUTDOWN,
+ new DeactivateNodeTransition(NodeState.SHUTDOWN))
+
+ //Transitions from SHUTDOWN state
+ .addTransition(NodeState.SHUTDOWN, NodeState.SHUTDOWN,
+ RMNodeEventType.RESOURCE_UPDATE,
+ new UpdateNodeResourceWhenUnusableTransition())
+ .addTransition(NodeState.SHUTDOWN, NodeState.SHUTDOWN,
+ RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+ new AddContainersToBeRemovedFromNMTransition())
+
+ // create the topology tables
+ .installTopology();
private final StateMachine<NodeState, RMNodeEventType,
RMNodeEvent> stateMachine;
@@ -265,7 +314,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
this.writeLock = lock.writeLock();
this.stateMachine = stateMachineFactory.make(this);
-
+
this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();
this.containerAllocationExpirer = context.getContainerAllocationExpirer();
@@ -291,6 +340,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
return httpPort;
}
+ // Test only
+ public void setHttpPort(int port) {
+ this.httpPort = port;
+ }
+
@Override
public NodeId getNodeID() {
return this.nodeId;
@@ -497,23 +551,35 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
metrics.decrNumShutdownNMs();
break;
default:
- LOG.debug("Unexpected previous node state");
+ LOG.debug("Unexpected previous node state");
}
}
+ // Treats nodes in decommissioning as active nodes
+ // TODO we may want to differentiate active nodes and decommissioning node in
+ // metrics later.
+ private void updateMetricsForGracefulDecommissionOnUnhealthyNode() {
+ ClusterMetrics metrics = ClusterMetrics.getMetrics();
+ metrics.incrNumActiveNodes();
+ metrics.decrNumUnhealthyNMs();
+ }
+
private void updateMetricsForDeactivatedNode(NodeState initialState,
NodeState finalState) {
ClusterMetrics metrics = ClusterMetrics.getMetrics();
switch (initialState) {
- case RUNNING:
- metrics.decrNumActiveNodes();
- break;
- case UNHEALTHY:
- metrics.decrNumUnhealthyNMs();
- break;
- default:
- LOG.debug("Unexpected inital state");
+ case RUNNING:
+ metrics.decrNumActiveNodes();
+ break;
+ case DECOMMISSIONING:
+ metrics.decrNumActiveNodes();
+ break;
+ case UNHEALTHY:
+ metrics.decrNumUnhealthyNMs();
+ break;
+ default:
+ LOG.debug("Unexpected inital state");
}
switch (finalState) {
@@ -608,10 +674,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
public static class ReconnectNodeTransition implements
- SingleArcTransition<RMNodeImpl, RMNodeEvent> {
+ MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> {
@Override
- public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
RMNode newNode = reconnectEvent.getReconnectedNode();
rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
@@ -622,6 +688,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
// No application running on the node, so send node-removal event with
// cleaning up old container info.
if (noRunningApps) {
+ if (rmNode.getState() == NodeState.DECOMMISSIONING) {
+ // When node in decommissioning, and no running apps on this node,
+ // it will return as decommissioned state.
+ deactivateNode(rmNode, NodeState.DECOMMISSIONED);
+ return NodeState.DECOMMISSIONED;
+ }
rmNode.nodeUpdateQueue.clear();
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
@@ -652,6 +724,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
rmNode.context.getDispatcher().getEventHandler().handle(
new RMNodeStartedEvent(newNode.getNodeID(), null, null));
}
+
} else {
rmNode.httpPort = newNode.getHttpPort();
rmNode.httpAddress = newNode.getHttpAddress();
@@ -678,17 +751,21 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption
.newInstance(newNode.getTotalCapability(), -1)));
}
+
}
+ return rmNode.getState();
}
private void handleNMContainerStatus(
List<NMContainerStatus> nmContainerStatuses, RMNodeImpl rmnode) {
- List<ContainerStatus> containerStatuses =
- new ArrayList<ContainerStatus>();
- for (NMContainerStatus nmContainerStatus : nmContainerStatuses) {
- containerStatuses.add(createContainerStatus(nmContainerStatus));
+ if (nmContainerStatuses != null) {
+ List<ContainerStatus> containerStatuses =
+ new ArrayList<ContainerStatus>();
+ for (NMContainerStatus nmContainerStatus : nmContainerStatuses) {
+ containerStatuses.add(createContainerStatus(nmContainerStatus));
+ }
+ rmnode.handleContainerStatus(containerStatuses);
}
- rmnode.handleContainerStatus(containerStatuses);
}
private ContainerStatus createContainerStatus(
@@ -770,31 +847,94 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
- // Inform the scheduler
- rmNode.nodeUpdateQueue.clear();
- // If the current state is NodeState.UNHEALTHY
- // Then node is already been removed from the
- // Scheduler
- NodeState initialState = rmNode.getState();
- if (!initialState.equals(NodeState.UNHEALTHY)) {
- rmNode.context.getDispatcher().getEventHandler()
- .handle(new NodeRemovedSchedulerEvent(rmNode));
+ RMNodeImpl.deactivateNode(rmNode, finalState);
+ }
+ }
+
+ /**
+ * Put a node in deactivated (decommissioned) status.
+ * @param rmNode
+ * @param finalState
+ */
+ public static void deactivateNode(RMNodeImpl rmNode, NodeState finalState) {
+
+ reportNodeUnusable(rmNode, finalState);
+
+ // Deactivate the node
+ rmNode.context.getRMNodes().remove(rmNode.nodeId);
+ LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ + finalState);
+ rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
+ }
+
+ /**
+ * Report node is UNUSABLE and update metrics.
+ * @param rmNode
+ * @param finalState
+ */
+ public static void reportNodeUnusable(RMNodeImpl rmNode,
+ NodeState finalState) {
+ // Inform the scheduler
+ rmNode.nodeUpdateQueue.clear();
+ // If the current state is NodeState.UNHEALTHY
+ // Then node is already been removed from the
+ // Scheduler
+ NodeState initialState = rmNode.getState();
+ if (!initialState.equals(NodeState.UNHEALTHY)) {
+ rmNode.context.getDispatcher().getEventHandler()
+ .handle(new NodeRemovedSchedulerEvent(rmNode));
+ }
+ rmNode.context.getDispatcher().getEventHandler().handle(
+ new NodesListManagerEvent(
+ NodesListManagerEventType.NODE_UNUSABLE, rmNode));
+
+ //Update the metrics
+ rmNode.updateMetricsForDeactivatedNode(initialState, finalState);
+ }
+
+ /**
+ * The transition to put node in decommissioning state.
+ */
+ public static class DecommissioningNodeTransition
+ implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
+ private final NodeState initState;
+ private final NodeState finalState;
+
+ public DecommissioningNodeTransition(NodeState initState,
+ NodeState finalState) {
+ this.initState = initState;
+ this.finalState = finalState;
+ }
+
+ @Override
+ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+ LOG.info("Put Node " + rmNode.nodeId + " in DECOMMISSIONING.");
+ if (initState.equals(NodeState.UNHEALTHY)) {
+ rmNode.updateMetricsForGracefulDecommissionOnUnhealthyNode();
}
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodesListManagerEvent(
- NodesListManagerEventType.NODE_UNUSABLE, rmNode));
+ // TODO (in YARN-3223) Keep NM's available resource to be 0
+ }
+ }
+
+ public static class RecommissionNodeTransition
+ implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
- // Deactivate the node
- rmNode.context.getRMNodes().remove(rmNode.nodeId);
- LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
- + finalState);
- rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
+ private final NodeState finalState;
+ public RecommissionNodeTransition(NodeState finalState) {
+ this.finalState = finalState;
+ }
- //Update the metrics
- rmNode.updateMetricsForDeactivatedNode(initialState, finalState);
+ @Override
+ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+ LOG.info("Node " + rmNode.nodeId + " in DECOMMISSIONING is " +
+ "recommissioned back to RUNNING.");
+ // TODO handle NM resource resume in YARN-3223.
}
}
+ /**
+ * Status update transition when node is healthy.
+ */
public static class StatusUpdateWhenHealthyTransition implements
MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> {
@Override
@@ -805,25 +945,44 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
// Switch the last heartbeatresponse.
rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
- NodeHealthStatus remoteNodeHealthStatus =
+ NodeHealthStatus remoteNodeHealthStatus =
statusEvent.getNodeHealthStatus();
rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
rmNode.setLastHealthReportTime(
remoteNodeHealthStatus.getLastHealthReportTime());
+ NodeState initialState = rmNode.getState();
+ boolean isNodeDecommissioning =
+ initialState.equals(NodeState.DECOMMISSIONING);
if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
- LOG.info("Node " + rmNode.nodeId + " reported UNHEALTHY with details: "
- + remoteNodeHealthStatus.getHealthReport());
- rmNode.nodeUpdateQueue.clear();
- // Inform the scheduler
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodeRemovedSchedulerEvent(rmNode));
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodesListManagerEvent(
- NodesListManagerEventType.NODE_UNUSABLE, rmNode));
- // Update metrics
- rmNode.updateMetricsForDeactivatedNode(rmNode.getState(),
- NodeState.UNHEALTHY);
- return NodeState.UNHEALTHY;
+ LOG.info("Node " + rmNode.nodeId +
+ " reported UNHEALTHY with details: " +
+ remoteNodeHealthStatus.getHealthReport());
+ // if a node in decommissioning receives an unhealthy report,
+ // it will keep decommissioning.
+ if (isNodeDecommissioning) {
+ return NodeState.DECOMMISSIONING;
+ } else {
+ reportNodeUnusable(rmNode, NodeState.UNHEALTHY);
+ return NodeState.UNHEALTHY;
+ }
+ }
+ if (isNodeDecommissioning) {
+ List<ApplicationId> runningApps = rmNode.getRunningApps();
+
+ List<ApplicationId> keepAliveApps = statusEvent.getKeepAliveAppIds();
+
+ // no running (and keeping alive) app on this node, get it
+ // decommissioned.
+ // TODO may need to check no container is being scheduled on this node
+ // as well.
+ if ((runningApps == null || runningApps.size() == 0)
+ && (keepAliveApps == null || keepAliveApps.size() == 0)) {
+ RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED);
+ return NodeState.DECOMMISSIONED;
+ }
+
+ // TODO (in YARN-3223) if node in decommissioning, get node resource
+ // updated if container get finished (keep available resource to be 0)
}
rmNode.handleContainerStatus(statusEvent.getContainers());
@@ -848,7 +1007,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
statusEvent.getKeepAliveAppIds());
}
- return NodeState.RUNNING;
+ return initialState;
}
}
@@ -857,11 +1016,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
@Override
public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
- RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
+ RMNodeStatusEvent statusEvent = (RMNodeStatusEvent)event;
// Switch the last heartbeatresponse.
rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
- NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus();
+ NodeHealthStatus remoteNodeHealthStatus =
+ statusEvent.getNodeHealthStatus();
rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
rmNode.setLastHealthReportTime(
remoteNodeHealthStatus.getLastHealthReportTime());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bc913a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 61c6166..a6e1575 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -29,7 +29,9 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Random;
+import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -75,7 +77,7 @@ import org.mockito.stubbing.Answer;
public class TestRMNodeTransitions {
RMNodeImpl node;
-
+
private RMContext rmContext;
private YarnScheduler scheduler;
@@ -168,6 +170,42 @@ public class TestRMNodeTransitions {
return event;
}
+ private RMNodeStatusEvent getMockRMNodeStatusEventWithRunningApps() {
+ NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
+
+ NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
+ Boolean yes = new Boolean(true);
+ doReturn(yes).when(healthStatus).getIsNodeHealthy();
+
+ RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
+ doReturn(healthStatus).when(event).getNodeHealthStatus();
+ doReturn(response).when(event).getLatestResponse();
+ doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
+ doReturn(getAppIdList()).when(event).getKeepAliveAppIds();
+ return event;
+ }
+
+ private List<ApplicationId> getAppIdList() {
+ List<ApplicationId> appIdList = new ArrayList<ApplicationId>();
+ appIdList.add(BuilderUtils.newApplicationId(0, 0));
+ return appIdList;
+ }
+
+ private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() {
+ NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
+
+ NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
+ Boolean yes = new Boolean(true);
+ doReturn(yes).when(healthStatus).getIsNodeHealthy();
+
+ RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
+ doReturn(healthStatus).when(event).getNodeHealthStatus();
+ doReturn(response).when(event).getLatestResponse();
+ doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
+ doReturn(null).when(event).getKeepAliveAppIds();
+ return event;
+ }
+
@Test (timeout = 5000)
public void testExpiredContainer() {
// Start the node
@@ -195,7 +233,33 @@ public class TestRMNodeTransitions {
*/
verify(scheduler,times(2)).handle(any(NodeUpdateSchedulerEvent.class));
}
-
+
+ @Test
+ public void testStatusUpdateOnDecommissioningNode(){
+ RMNodeImpl node = getDecommissioningNode();
+ Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+ // Verify node in DECOMMISSIONING won't be changed by status update
+ // with running apps
+ RMNodeStatusEvent statusEvent = getMockRMNodeStatusEventWithRunningApps();
+ node.handle(statusEvent);
+ Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+
+ // Verify node in DECOMMISSIONING will be changed by status update
+ // without running apps
+ statusEvent = getMockRMNodeStatusEventWithoutRunningApps();
+ node.handle(statusEvent);
+ Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
+ }
+
+ @Test
+ public void testRecommissionNode(){
+ RMNodeImpl node = getDecommissioningNode();
+ Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+ node.handle(new RMNodeEvent(node.getNodeID(),
+ RMNodeEventType.RECOMMISSION));
+ Assert.assertEquals(NodeState.RUNNING, node.getState());
+ }
+
@Test (timeout = 5000)
public void testContainerUpdate() throws InterruptedException{
//Start the node
@@ -253,9 +317,9 @@ public class TestRMNodeTransitions {
Assert.assertEquals(completedContainerIdFromNode2_1,completedContainers.get(0)
.getContainerId());
Assert.assertEquals(completedContainerIdFromNode2_2,completedContainers.get(1)
- .getContainerId());
+ .getContainerId());
}
-
+
@Test (timeout = 5000)
public void testStatusChange(){
//Start the node
@@ -292,7 +356,7 @@ public class TestRMNodeTransitions {
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE));
Assert.assertEquals(0, node.getQueueSize());
}
-
+
@Test
public void testRunningExpire() {
RMNodeImpl node = getRunningNode();
@@ -375,7 +439,7 @@ public class TestRMNodeTransitions {
initialRebooted, cm.getNumRebootedNMs());
Assert.assertEquals(NodeState.LOST, node.getState());
}
-
+
@Test
public void testUnhealthyExpireForSchedulerRemove() {
RMNodeImpl node = getUnhealthyNode();
@@ -408,6 +472,28 @@ public class TestRMNodeTransitions {
}
@Test
+ public void testDecommissionOnDecommissioningNode() {
+ RMNodeImpl node = getDecommissioningNode();
+ ClusterMetrics cm = ClusterMetrics.getMetrics();
+ int initialActive = cm.getNumActiveNMs();
+ int initialLost = cm.getNumLostNMs();
+ int initialUnhealthy = cm.getUnhealthyNMs();
+ int initialDecommissioned = cm.getNumDecommisionedNMs();
+ int initialRebooted = cm.getNumRebootedNMs();
+ node.handle(new RMNodeEvent(node.getNodeID(),
+ RMNodeEventType.DECOMMISSION));
+ Assert.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
+ Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
+ Assert.assertEquals("Unhealthy Nodes",
+ initialUnhealthy, cm.getUnhealthyNMs());
+ Assert.assertEquals("Decommissioned Nodes",
+ initialDecommissioned + 1, cm.getNumDecommisionedNMs());
+ Assert.assertEquals("Rebooted Nodes",
+ initialRebooted, cm.getNumRebootedNMs());
+ Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
+ }
+
+ @Test
public void testUnhealthyDecommission() {
RMNodeImpl node = getUnhealthyNode();
ClusterMetrics cm = ClusterMetrics.getMetrics();
@@ -429,6 +515,30 @@ public class TestRMNodeTransitions {
Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
}
+ // Test Decommissioning on a unhealthy node will make it decommissioning.
+ @Test
+ public void testUnhealthyDecommissioning() {
+ RMNodeImpl node = getUnhealthyNode();
+ ClusterMetrics cm = ClusterMetrics.getMetrics();
+ int initialActive = cm.getNumActiveNMs();
+ int initialLost = cm.getNumLostNMs();
+ int initialUnhealthy = cm.getUnhealthyNMs();
+ int initialDecommissioned = cm.getNumDecommisionedNMs();
+ int initialRebooted = cm.getNumRebootedNMs();
+ node.handle(new RMNodeEvent(node.getNodeID(),
+ RMNodeEventType.GRACEFUL_DECOMMISSION));
+ Assert.assertEquals("Active Nodes", initialActive + 1,
+ cm.getNumActiveNMs());
+ Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
+ Assert.assertEquals("Unhealthy Nodes",
+ initialUnhealthy - 1, cm.getUnhealthyNMs());
+ Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
+ cm.getNumDecommisionedNMs());
+ Assert.assertEquals("Rebooted Nodes",
+ initialRebooted, cm.getNumRebootedNMs());
+ Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+ }
+
@Test
public void testRunningRebooting() {
RMNodeImpl node = getRunningNode();
@@ -567,6 +677,14 @@ public class TestRMNodeTransitions {
return node;
}
+ private RMNodeImpl getDecommissioningNode() {
+ RMNodeImpl node = getRunningNode();
+ node.handle(new RMNodeEvent(node.getNodeID(),
+ RMNodeEventType.GRACEFUL_DECOMMISSION));
+ Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+ return node;
+ }
+
private RMNodeImpl getUnhealthyNode() {
RMNodeImpl node = getRunningNode();
NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
@@ -577,20 +695,19 @@ public class TestRMNodeTransitions {
return node;
}
-
private RMNodeImpl getNewNode() {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
return node;
}
-
+
private RMNodeImpl getNewNode(Resource capability) {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null,
capability, null);
return node;
}
-
+
private RMNodeImpl getRebootedNode() {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
Resource capability = Resource.newInstance(4096, 4);
@@ -650,7 +767,39 @@ public class TestRMNodeTransitions {
Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
nodesListManagerEvent.getType());
}
-
+
+ @Test
+ public void testReconnectOnDecommissioningNode() {
+ RMNodeImpl node = getDecommissioningNode();
+
+ // Reconnect event with running app
+ node.handle(new RMNodeReconnectEvent(node.getNodeID(), node,
+ getAppIdList(), null));
+ // still decommissioning
+ Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+
+ // Reconnect event without any running app
+ node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null));
+ Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
+ }
+
+ @Test
+ public void testReconnectWithNewPortOnDecommissioningNode() {
+ RMNodeImpl node = getDecommissioningNode();
+ Random r= new Random();
+ node.setHttpPort(r.nextInt(10000));
+ // Reconnect event with running app
+ node.handle(new RMNodeReconnectEvent(node.getNodeID(), node,
+ getAppIdList(), null));
+ // still decommissioning
+ Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+
+ node.setHttpPort(r.nextInt(10000));
+ // Reconnect event without any running app
+ node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null));
+ Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
+ }
+
@Test
public void testResourceUpdateOnRunningNode() {
RMNodeImpl node = getRunningNode();
@@ -658,18 +807,23 @@ public class TestRMNodeTransitions {
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
- ResourceOption.newInstance(Resource.newInstance(2048, 2),
+ ResourceOption.newInstance(Resource.newInstance(2048, 2),
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
-
+
Assert.assertEquals(NodeState.RUNNING, node.getState());
Assert.assertNotNull(nodesListManagerEvent);
Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
nodesListManagerEvent.getType());
}
-
+
+ @Test
+ public void testDecommissioningOnRunningNode(){
+ getDecommissioningNode();
+ }
+
@Test
public void testResourceUpdateOnNewNode() {
RMNodeImpl node = getNewNode(Resource.newInstance(4096, 4));
@@ -682,10 +836,10 @@ public class TestRMNodeTransitions {
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
-
+
Assert.assertEquals(NodeState.NEW, node.getState());
}
-
+
@Test
public void testResourceUpdateOnRebootedNode() {
RMNodeImpl node = getRebootedNode();
@@ -702,6 +856,18 @@ public class TestRMNodeTransitions {
Assert.assertEquals(NodeState.REBOOTED, node.getState());
}
+ // Test unhealthy report on a decommissioning node will make it
+ // keep decommissioning.
+ @Test
+ public void testDecommissioningUnhealthy() {
+ RMNodeImpl node = getDecommissioningNode();
+ NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
+ System.currentTimeMillis());
+ node.handle(new RMNodeStatusEvent(node.getNodeID(), status,
+ new ArrayList<ContainerStatus>(), null, null));
+ Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+ }
+
@Test
public void testReconnnectUpdate() {
final String nmVersion1 = "nm version 1";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bc913a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java
index b70fdc1..458b240 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestNodesPage.java
@@ -43,8 +43,7 @@ public class TestNodesPage {
final int numberOfNodesPerRack = 8;
// The following is because of the way TestRMWebApp.mockRMContext creates
// nodes.
- final int numberOfLostNodesPerRack = numberOfNodesPerRack
- / NodeState.values().length;
+ final int numberOfLostNodesPerRack = 1;
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
// future. In that case this value should be adjusted to the new value.