You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2020/09/28 18:53:16 UTC

[hadoop] branch branch-3.2 updated: YARN-9809. Added node manager health status to resource manager registration call. Contributed by Eric Badger (ebadger).

This is an automated email from the ASF dual-hosted git repository.

epayne pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 947b0a15 YARN-9809. Added node manager health status to resource manager registration call. Contributed by Eric Badger (ebadger).
947b0a15 is described below

commit 947b0a154a6be7611dc623da66be6133a54d9b44
Author: Eric E Payne <ep...@apache.org>
AuthorDate: Mon Sep 28 18:50:44 2020 +0000

    YARN-9809. Added node manager health status to resource manager registration call. Contributed by Eric Badger (ebadger).
---
 .../apache/hadoop/util/NodeHealthScriptRunner.java | 18 +++++--
 .../hadoop/util/TestNodeHealthScriptRunner.java    |  2 +-
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |  7 +++
 .../src/main/resources/yarn-default.xml            |  7 +++
 .../RegisterNodeManagerRequest.java                | 19 ++++++-
 .../impl/pb/RegisterNodeManagerRequestPBImpl.java  | 41 ++++++++++++++-
 .../proto/yarn_server_common_service_protos.proto  |  1 +
 .../yarn/server/nodemanager/NodeManager.java       |  5 +-
 .../server/nodemanager/NodeStatusUpdaterImpl.java  |  3 +-
 .../yarn/server/nodemanager/TestEventFlow.java     |  5 ++
 .../containermanager/BaseContainerManagerTest.java |  7 ++-
 .../resourcemanager/ResourceTrackerService.java    |  5 +-
 .../server/resourcemanager/rmnode/RMNodeImpl.java  | 56 +++++++++++++++++----
 .../resourcemanager/rmnode/RMNodeStartedEvent.java |  9 +++-
 .../hadoop/yarn/server/resourcemanager/MockNM.java | 22 ++++++++
 .../hadoop/yarn/server/resourcemanager/MockRM.java |  7 ++-
 .../yarn/server/resourcemanager/NodeManager.java   |  3 +-
 .../resourcemanager/TestRMNodeTransitions.java     | 54 +++++++++++++++++---
 .../resourcemanager/TestResourceManager.java       | 22 +++++---
 .../TestResourceTrackerService.java                |  6 +++
 .../TestRMAppLogAggregationStatus.java             |  7 ++-
 .../resourcetracker/TestNMExpiry.java              |  7 +++
 .../resourcetracker/TestNMReconnect.java           |  7 +++
 .../scheduler/TestAbstractYarnScheduler.java       |  6 +++
 .../scheduler/TestSchedulerHealth.java             | 18 ++++---
 .../scheduler/capacity/TestCapacityScheduler.java  | 58 +++++++++++++++-------
 .../scheduler/fair/TestFairScheduler.java          | 15 ++++--
 .../scheduler/fifo/TestFifoScheduler.java          | 24 ++++++---
 .../webapp/TestRMWebServicesNodes.java             |  5 +-
 29 files changed, 367 insertions(+), 79 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeHealthScriptRunner.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeHealthScriptRunner.java
index 1d2e4b5..66d6bcd 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeHealthScriptRunner.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeHealthScriptRunner.java
@@ -71,6 +71,8 @@ public class NodeHealthScriptRunner extends AbstractService {
   private long lastReportedTime;
 
   private TimerTask timer;
+
+  private boolean runBeforeStartup;
   
   private enum HealthCheckerExitStatus {
     SUCCESS,
@@ -191,7 +193,7 @@ public class NodeHealthScriptRunner extends AbstractService {
   }
 
   public NodeHealthScriptRunner(String scriptName, long chkInterval, long timeout,
-      String[] scriptArgs) {
+      String[] scriptArgs, boolean runBeforeStartup) {
     super(NodeHealthScriptRunner.class.getName());
     this.lastReportedTime = System.currentTimeMillis();
     this.isHealthy = true;
@@ -200,6 +202,7 @@ public class NodeHealthScriptRunner extends AbstractService {
     this.intervalTime = chkInterval;
     this.scriptTimeout = timeout;
     this.timer = new NodeHealthMonitorExecutor(scriptArgs);
+    this.runBeforeStartup = runBeforeStartup;
   }
 
   /*
@@ -217,9 +220,16 @@ public class NodeHealthScriptRunner extends AbstractService {
   @Override
   protected void serviceStart() throws Exception {
     nodeHealthScriptScheduler = new Timer("NodeHealthMonitor-Timer", true);
-    // Start the timer task immediately and
-    // then periodically at interval time.
-    nodeHealthScriptScheduler.scheduleAtFixedRate(timer, 0, intervalTime);
+    long delay = 0;
+    if (runBeforeStartup) {
+      // Start the timer task immediately and wait for it to return.
+      timer.run();
+      delay = intervalTime;
+    }
+
+    // Set the script to run periodically at interval time.
+    nodeHealthScriptScheduler.scheduleAtFixedRate(timer, delay,
+        intervalTime);
     super.serviceStart();
   }
 
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestNodeHealthScriptRunner.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestNodeHealthScriptRunner.java
index 2748c0b..86b9f46 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestNodeHealthScriptRunner.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestNodeHealthScriptRunner.java
@@ -100,7 +100,7 @@ public class TestNodeHealthScriptRunner {
     writeNodeHealthScriptFile(normalScript, true);
     NodeHealthScriptRunner nodeHealthScriptRunner = new NodeHealthScriptRunner(
             nodeHealthscriptFile.getAbsolutePath(),
-            500, 1000, new String[] {});
+            500, 1000, new String[] {}, true);
     nodeHealthScriptRunner.init(conf);
     TimerTask timerTask = nodeHealthScriptRunner.getTimerTask();
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index e40861d..ba440e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1928,6 +1928,13 @@ public class YarnConfiguration extends Configuration {
     NM_PREFIX + "health-checker.script.timeout-ms";
   public static final long DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = 
     2 * DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS;
+
+  /** Whether or not to run the node health script before the NM
+   *  starts up.*/
+  public static final String NM_HEALTH_CHECK_RUN_BEFORE_STARTUP =
+      NM_PREFIX + "health-checker.run-before-startup";
+  public static final boolean DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP =
+      false;
   
   /** The health check script to run.*/
   public static final String NM_HEALTH_CHECK_SCRIPT_PATH = 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index acb7015..7236e4f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1596,6 +1596,13 @@
   </property>
 
   <property>
+    <description>Whether or not to run the node health script
+    before the NM starts up.</description>
+    <name>yarn.nodemanager.health-checker.run-before-startup</name>
+    <value>false</value>
+  </property>
+
+  <property>
     <description>The health check script to run.</description>
     <name>yarn.nodemanager.health-checker.script.path</name>
     <value></value>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
index acec16f..54b3915 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.util.Records;
 
 public abstract class RegisterNodeManagerRequest {
@@ -53,14 +54,15 @@ public abstract class RegisterNodeManagerRequest {
       Resource physicalResource) {
     return newInstance(nodeId, httpPort, resource, nodeManagerVersionId,
         containerStatuses, runningApplications, nodeLabels, physicalResource,
-        null);
+        null, null);
   }
 
   public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
       int httpPort, Resource resource, String nodeManagerVersionId,
       List<NMContainerStatus> containerStatuses,
       List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels,
-      Resource physicalResource, Set<NodeAttribute> nodeAttributes) {
+      Resource physicalResource, Set<NodeAttribute> nodeAttributes,
+      NodeStatus nodeStatus) {
     RegisterNodeManagerRequest request =
         Records.newRecord(RegisterNodeManagerRequest.class);
     request.setHttpPort(httpPort);
@@ -72,6 +74,7 @@ public abstract class RegisterNodeManagerRequest {
     request.setNodeLabels(nodeLabels);
     request.setPhysicalResource(physicalResource);
     request.setNodeAttributes(nodeAttributes);
+    request.setNodeStatus(nodeStatus);
     return request;
   }
   
@@ -133,4 +136,16 @@ public abstract class RegisterNodeManagerRequest {
   public abstract Set<NodeAttribute> getNodeAttributes();
 
   public abstract void setNodeAttributes(Set<NodeAttribute> nodeAttributes);
+
+  /**
+   * Get the status of the node.
+   * @return The status of the node.
+   */
+  public abstract NodeStatus getNodeStatus();
+
+  /**
+   * Set the status of the node.
+   * @param nodeStatus The status of the node.
+   */
+  public abstract void setNodeStatus(NodeStatus nodeStatus);
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
index 317f8ab..1ad4149 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
@@ -45,13 +45,16 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregation
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeAttributesProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
-    
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
+
 public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest {
   RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
   RegisterNodeManagerRequestProto.Builder builder = null;
@@ -68,6 +71,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
 
   /** Physical resources in the node. */
   private Resource physicalResource = null;
+  private NodeStatus nodeStatus;
 
   public RegisterNodeManagerRequestPBImpl() {
     builder = RegisterNodeManagerRequestProto.newBuilder();
@@ -118,6 +122,9 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
     if (this.physicalResource != null) {
       builder.setPhysicalResource(convertToProtoFormat(this.physicalResource));
     }
+    if (this.nodeStatus != null) {
+      builder.setNodeStatus(convertToProtoFormat(this.nodeStatus));
+    }
     if (this.logAggregationReportsForApps != null) {
       addLogAggregationStatusForAppsToProto();
     }
@@ -360,6 +367,29 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
   }
 
   @Override
+  public synchronized NodeStatus getNodeStatus() {
+    RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.nodeStatus != null) {
+      return this.nodeStatus;
+    }
+    if (!p.hasNodeStatus()) {
+      return null;
+    }
+    this.nodeStatus = convertFromProtoFormat(p.getNodeStatus());
+    return this.nodeStatus;
+  }
+
+  @Override
+  public synchronized void setNodeStatus(NodeStatus pNodeStatus) {
+    maybeInitBuilder();
+    if (pNodeStatus == null) {
+      builder.clearNodeStatus();
+    }
+    this.nodeStatus = pNodeStatus;
+  }
+
+
+  @Override
   public int hashCode() {
     return getProto().hashCode();
   }
@@ -533,4 +563,13 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
     }
     this.logAggregationReportsForApps = logAggregationStatusForApps;
   }
+
+  private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto s) {
+    return new NodeStatusPBImpl(s);
+  }
+
+  private NodeStatusProto convertToProtoFormat(NodeStatus s) {
+    return ((NodeStatusPBImpl)s).getProto();
+  }
+
 }
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index fb74237..54b097e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -73,6 +73,7 @@ message RegisterNodeManagerRequestProto {
   optional ResourceProto physicalResource = 9;
   repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10;
   optional NodeAttributesProto nodeAttributes = 11;
+  optional NodeStatusProto nodeStatus = 12;
 }
 
 message RegisterNodeManagerResponseProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 6edde64..62ae7b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -367,8 +367,11 @@ public class NodeManager extends CompositeService
         YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS);
     String[] scriptArgs = conf.getStrings(
         YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_OPTS, new String[] {});
+    boolean runBeforeStartup = conf.getBoolean(
+        YarnConfiguration.NM_HEALTH_CHECK_RUN_BEFORE_STARTUP,
+        YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP);
     return new NodeHealthScriptRunner(nodeHealthScript,
-        nmCheckintervalTime, scriptTimeout, scriptArgs);
+        nmCheckintervalTime, scriptTimeout, scriptArgs, runBeforeStartup);
   }
 
   @VisibleForTesting
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 682937d..e50f713 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -390,10 +390,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     // during RM recovery
     synchronized (this.context) {
       List<NMContainerStatus> containerReports = getNMContainerStatuses();
+      NodeStatus nodeStatus = getNodeStatus(0);
       RegisterNodeManagerRequest request =
           RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
               nodeManagerVersionId, containerReports, getRunningApplications(),
-              nodeLabels, physicalResource, nodeAttributes);
+              nodeLabels, physicalResource, nodeAttributes, nodeStatus);
 
       if (containerReports != null) {
         LOG.info("Registering with RM using containers :" + containerReports);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
index 54e090a..e47bdba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import static org.mockito.Mockito.mock;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -133,6 +135,9 @@ public class TestEventFlow {
         new DummyContainerManager(context, exec, del, nodeStatusUpdater,
           metrics, dirsHandler);
     nodeStatusUpdater.init(conf);
+    NodeResourceMonitorImpl nodeResourceMonitor = mock(
+        NodeResourceMonitorImpl.class);
+    ((NMContext) context).setNodeResourceMonitor(nodeResourceMonitor);
     ((NMContext)context).setContainerManager(containerManager);
     nodeStatusUpdater.start();
     ((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 493aa4c..5b1d84c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
 
+import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitorImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -156,9 +157,12 @@ public abstract class BaseContainerManagerTest {
   protected NodeHealthCheckerService nodeHealthChecker;
   protected LocalDirsHandlerService dirsHandler;
   protected final long DUMMY_RM_IDENTIFIER = 1234;
+  private NodeHealthCheckerService nodeHealthCheckerService = mock(NodeHealthCheckerService.class);
+  private NodeResourceMonitorImpl nodeResourceMonitor = mock(
+      NodeResourceMonitorImpl.class);
 
   protected NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(
-      context, new AsyncDispatcher(), null, metrics) {
+      context, new AsyncDispatcher(), nodeHealthCheckerService, metrics) {
     @Override
     protected ResourceTracker getRMClient() {
       return new LocalRMInterface();
@@ -223,6 +227,7 @@ public abstract class BaseContainerManagerTest {
     nodeHealthChecker.init(conf);
     containerManager = createContainerManager(delSrvc);
     ((NMContext)context).setContainerManager(containerManager);
+    ((NMContext) context).setNodeResourceMonitor(nodeResourceMonitor);
     nodeStatusUpdater.init(conf);
     containerManager.init(conf);
     nodeStatusUpdater.start();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 8963d72..48401e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -333,6 +333,7 @@ public class ResourceTrackerService extends AbstractService implements
     Resource capability = request.getResource();
     String nodeManagerVersion = request.getNMVersion();
     Resource physicalResource = request.getPhysicalResource();
+    NodeStatus nodeStatus = request.getNodeStatus();
 
     RegisterNodeManagerResponse response = recordFactory
         .newRecordInstance(RegisterNodeManagerResponse.class);
@@ -409,7 +410,7 @@ public class ResourceTrackerService extends AbstractService implements
     if (oldNode == null) {
       RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId,
           request.getNMContainerStatuses(),
-          request.getRunningApplications());
+          request.getRunningApplications(), nodeStatus);
       if (request.getLogAggregationReportsForApps() != null
           && !request.getLogAggregationReportsForApps().isEmpty()) {
         if (LOG.isDebugEnabled()) {
@@ -445,7 +446,7 @@ public class ResourceTrackerService extends AbstractService implements
 
         this.rmContext.getRMNodes().put(nodeId, rmNode);
         this.rmContext.getDispatcher().getEventHandler()
-            .handle(new RMNodeStartedEvent(nodeId, null, null));
+            .handle(new RMNodeStartedEvent(nodeId, null, null, nodeStatus));
       } else {
         // Reset heartbeat ID since node just restarted.
         oldNode.resetLastNodeHeartBeatResponse();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 7fe53c1..464aaae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.commons.collections.keyvalue.DefaultMapEntry;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -199,7 +200,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
                                            RMNodeEventType,
                                            RMNodeEvent>(NodeState.NEW)
       //Transitions from NEW state
-      .addTransition(NodeState.NEW, NodeState.RUNNING,
+      .addTransition(NodeState.NEW,
+          EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
           RMNodeEventType.STARTED, new AddNodeTransition())
       .addTransition(NodeState.NEW, NodeState.NEW,
           RMNodeEventType.RESOURCE_UPDATE,
@@ -685,7 +687,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
   private void updateMetricsForRejoinedNode(NodeState previousNodeState) {
     ClusterMetrics metrics = ClusterMetrics.getMetrics();
-    metrics.incrNumActiveNodes();
 
     switch (previousNodeState) {
     case LOST:
@@ -827,10 +828,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   }
 
   public static class AddNodeTransition implements
-      SingleArcTransition<RMNodeImpl, RMNodeEvent> {
+      MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> {
 
     @Override
-    public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+    public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
       // Inform the scheduler
       RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event;
       List<NMContainerStatus> containers = null;
@@ -848,8 +849,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         if (previousRMNode != null) {
           ClusterMetrics.getMetrics().decrDecommisionedNMs();
         }
-        // Increment activeNodes explicitly because this is a new node.
-        ClusterMetrics.getMetrics().incrNumActiveNodes();
         containers = startEvent.getNMContainerStatuses();
         if (containers != null && !containers.isEmpty()) {
           for (NMContainerStatus container : containers) {
@@ -866,17 +865,35 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         }
       }
 
-      rmNode.context.getDispatcher().getEventHandler()
-        .handle(new NodeAddedSchedulerEvent(rmNode, containers));
-      rmNode.context.getDispatcher().getEventHandler().handle(
-        new NodesListManagerEvent(
-            NodesListManagerEventType.NODE_USABLE, rmNode));
+      NodeState nodeState;
+      NodeStatus nodeStatus = startEvent.getNodeStatus();
+
+      if (nodeStatus == null) {
+        nodeState = NodeState.RUNNING;
+        reportNodeRunning(rmNode, containers);
+      } else {
+        RMNodeStatusEvent rmNodeStatusEvent =
+            new RMNodeStatusEvent(nodeId, nodeStatus);
+
+        NodeHealthStatus nodeHealthStatus =
+            updateRMNodeFromStatusEvents(rmNode, rmNodeStatusEvent);
+
+        if (nodeHealthStatus.getIsNodeHealthy()) {
+          nodeState = NodeState.RUNNING;
+          reportNodeRunning(rmNode, containers);
+        } else {
+          nodeState = NodeState.UNHEALTHY;
+          reportNodeUnusable(rmNode, nodeState);
+        }
+      }
+
       List<LogAggregationReport> logAggregationReportsForApps =
           startEvent.getLogAggregationReportsForApps();
       if (logAggregationReportsForApps != null
           && !logAggregationReportsForApps.isEmpty()) {
         rmNode.handleLogAggregationStatus(logAggregationReportsForApps);
       }
+      return nodeState;
     }
   }
 
@@ -1088,6 +1105,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   }
 
   /**
+   * Report node is RUNNING.
+   * @param rmNode
+   * @param containers
+   */
+  public static void reportNodeRunning(RMNodeImpl rmNode,
+      List<NMContainerStatus> containers) {
+    rmNode.context.getDispatcher().getEventHandler()
+        .handle(new NodeAddedSchedulerEvent(rmNode, containers));
+    rmNode.context.getDispatcher().getEventHandler().handle(
+        new NodesListManagerEvent(
+        NodesListManagerEventType.NODE_USABLE, rmNode));
+    // Increment activeNodes explicitly because this is a new node.
+    ClusterMetrics.getMetrics().incrNumActiveNodes();
+  }
+
+  /**
    * Report node is UNUSABLE and update metrics.
    * @param rmNode
    * @param finalState
@@ -1278,6 +1311,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         // notifiers get update metadata because they will very likely query it
         // upon notification
         // Update metrics
+        ClusterMetrics.getMetrics().incrNumActiveNodes();
         rmNode.updateMetricsForRejoinedNode(NodeState.UNHEALTHY);
         return NodeState.RUNNING;
       }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
index 3976994..fbbda9a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
@@ -24,19 +24,22 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 
 public class RMNodeStartedEvent extends RMNodeEvent {
 
+  private final NodeStatus nodeStatus;
   private List<NMContainerStatus> containerStatuses;
   private List<ApplicationId> runningApplications;
   private List<LogAggregationReport> logAggregationReportsForApps;
 
   public RMNodeStartedEvent(NodeId nodeId,
       List<NMContainerStatus> containerReports,
-      List<ApplicationId> runningApplications) {
+      List<ApplicationId> runningApplications, NodeStatus nodeStatus) {
     super(nodeId, RMNodeEventType.STARTED);
     this.containerStatuses = containerReports;
     this.runningApplications = runningApplications;
+    this.nodeStatus = nodeStatus;
   }
 
   public List<NMContainerStatus> getNMContainerStatuses() {
@@ -55,4 +58,8 @@ public class RMNodeStartedEvent extends RMNodeEvent {
       List<LogAggregationReport> logAggregationReportsForApps) {
     this.logAggregationReportsForApps = logAggregationReportsForApps;
   }
+
+  public NodeStatus getNodeStatus() {
+    return nodeStatus;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
index 2e28395..04948aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -185,6 +188,17 @@ public class MockNM {
       req.setNodeLabels(nodeLabels);
     }
 
+    NodeStatus status = Records.newRecord(NodeStatus.class);
+    status.setResponseId(0);
+    status.setNodeId(nodeId);
+    status.setContainersStatuses(new ArrayList<>(containerStats.values()));
+    NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
+    healthStatus.setHealthReport("");
+    healthStatus.setIsNodeHealthy(true);
+    healthStatus.setLastHealthReportTime(1);
+    status.setNodeHealthStatus(healthStatus);
+    req.setNodeStatus(status);
+
     RegisterNodeManagerResponse registrationResponse =
         resourceTracker.registerNodeManager(req);
     this.currentContainerTokenMasterKey =
@@ -324,4 +338,12 @@ public class MockNM {
   public void setResponseId(int id) {
     this.responseId = id;
   }
+
+  public static NodeStatus createMockNodeStatus() {
+    NodeStatus mockNodeStatus = mock(NodeStatus.class);
+    NodeHealthStatus mockNodeHealthStatus = mock(NodeHealthStatus.class);
+    when(mockNodeStatus.getNodeHealthStatus()).thenReturn(mockNodeHealthStatus);
+    when(mockNodeHealthStatus.getIsNodeHealthy()).thenReturn(true);
+    return mockNodeStatus;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 9bbd148..9b33c83 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
@@ -77,6 +79,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
@@ -960,7 +963,9 @@ public class MockRM extends ResourceManager {
   public void sendNodeStarted(MockNM nm) throws Exception {
     RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
         nm.getNodeId());
-    node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null));
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+    node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null,
+        mockNodeStatus));
     drainEventsImplicitly();
   }
   
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
index 41ae7b8..5af33e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
@@ -96,7 +96,7 @@ public class NodeManager implements ContainerManagementProtocol {
   
   public NodeManager(String hostName, int containerManagerPort, int httpPort,
       String rackName, Resource capability,
-      ResourceManager resourceManager)
+      ResourceManager resourceManager, NodeStatus nodeStatus)
       throws IOException, YarnException {
     this.containerManagerAddress = hostName + ":" + containerManagerPort;
     this.nodeHttpAddress = hostName + ":" + httpPort;
@@ -111,6 +111,7 @@ public class NodeManager implements ContainerManagementProtocol {
     request.setResource(capability);
     request.setNodeId(this.nodeId);
     request.setNMVersion(YarnVersionInfo.getVersion());
+    request.setNodeStatus(nodeStatus);
     resourceTrackerService.registerNodeManager(request);
     this.resourceManager = resourceManager;
     resourceManager.getResourceScheduler().getNodeReport(this.nodeId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 67decaf..d9a1812 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -216,8 +217,9 @@ public class TestRMNodeTransitions {
 
   @Test (timeout = 5000)
   public void testExpiredContainer() {
+    NodeStatus mockNodeStatus = createMockNodeStatus();
     // Start the node
-    node.handle(new RMNodeStartedEvent(null, null, null));
+    node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
     verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
     
     // Expire a container
@@ -279,12 +281,13 @@ public class TestRMNodeTransitions {
 
   @Test (timeout = 5000)
   public void testContainerUpdate() throws InterruptedException{
+    NodeStatus mockNodeStatus = createMockNodeStatus();
     //Start the node
-    node.handle(new RMNodeStartedEvent(null, null, null));
+    node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
     
     NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
     RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
-    node2.handle(new RMNodeStartedEvent(null, null, null));
+    node2.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
 
     ApplicationId app0 = BuilderUtils.newApplicationId(0, 0);
     ApplicationId app1 = BuilderUtils.newApplicationId(1, 1);
@@ -340,8 +343,9 @@ public class TestRMNodeTransitions {
 
   @Test (timeout = 5000)
   public void testStatusChange(){
+    NodeStatus mockNodeStatus = createMockNodeStatus();
     //Start the node
-    node.handle(new RMNodeStartedEvent(null, null, null));
+    node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
     //Add info to the queue first
     node.setNextHeartBeat(false);
 
@@ -607,6 +611,33 @@ public class TestRMNodeTransitions {
   }
 
   @Test
+  public void testAddUnhealthyNode() {
+    ClusterMetrics cm = ClusterMetrics.getMetrics();
+    int initialUnhealthy = cm.getUnhealthyNMs();
+    int initialActive = cm.getNumActiveNMs();
+    int initialLost = cm.getNumLostNMs();
+    int initialDecommissioned = cm.getNumDecommisionedNMs();
+    int initialRebooted = cm.getNumRebootedNMs();
+
+    NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
+        System.currentTimeMillis());
+    NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0,
+        new ArrayList<>(), null, status, null, null, null);
+    node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null,
+        nodeStatus));
+
+    Assert.assertEquals("Unhealthy Nodes",
+        initialUnhealthy + 1, cm.getUnhealthyNMs());
+    Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
+    Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
+    Assert.assertEquals("Decommissioned Nodes",
+        initialDecommissioned, cm.getNumDecommisionedNMs());
+    Assert.assertEquals("Rebooted Nodes",
+        initialRebooted, cm.getNumRebootedNMs());
+    Assert.assertEquals(NodeState.UNHEALTHY, node.getState());
+  }
+
+  @Test
   public void testNMShutdown() {
     RMNodeImpl node = getRunningNode();
     node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.SHUTDOWN));
@@ -711,7 +742,9 @@ public class TestRMNodeTransitions {
     Resource capability = Resource.newInstance(4096, 4);
     RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null,
         capability, nmVersion);
-    node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+    node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null,
+        mockNodeStatus));
     Assert.assertEquals(NodeState.RUNNING, node.getState());
     return node;
   }
@@ -762,7 +795,9 @@ public class TestRMNodeTransitions {
     Resource capability = Resource.newInstance(4096, 4);
     RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
         null, capability, null);
-    node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+    node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null,
+        mockNodeStatus));
     Assert.assertEquals(NodeState.RUNNING, node.getState());
     node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.REBOOTING));
     Assert.assertEquals(NodeState.REBOOTED, node.getState());
@@ -778,7 +813,9 @@ public class TestRMNodeTransitions {
     int initialUnhealthy = cm.getUnhealthyNMs();
     int initialDecommissioned = cm.getNumDecommisionedNMs();
     int initialRebooted = cm.getNumRebootedNMs();
-    node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+    node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null,
+        mockNodeStatus));
     Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
     Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
     Assert.assertEquals("Unhealthy Nodes",
@@ -1074,8 +1111,9 @@ public class TestRMNodeTransitions {
 
   @Test
   public void testForHandlingDuplicatedCompltedContainers() {
+    NodeStatus mockNodeStatus = createMockNodeStatus();
     // Start the node
-    node.handle(new RMNodeStartedEvent(null, null, null));
+    node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
     // Add info to the queue first
     node.setNextHeartBeat(false);
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
index efa7496..e5f75d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
@@ -26,6 +27,7 @@ import java.util.Collection;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -82,12 +84,13 @@ public class TestResourceManager {
 
   private org.apache.hadoop.yarn.server.resourcemanager.NodeManager
       registerNode(String hostName, int containerManagerPort, int httpPort,
-          String rackName, Resource capability) throws IOException,
+      String rackName, Resource capability, NodeStatus nodeStatus)
+      throws IOException,
           YarnException {
     org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = 
         new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(
             hostName, containerManagerPort, httpPort, rackName, capability,
-            resourceManager);
+            resourceManager, nodeStatus);
     NodeAddedSchedulerEvent nodeAddEvent1 = 
         new NodeAddedSchedulerEvent(resourceManager.getRMContext()
             .getRMNodes().get(nm.getNodeId()));
@@ -103,26 +106,30 @@ public class TestResourceManager {
         
     final int memory = 4 * 1024;
     final int vcores = 4;
+
+    NodeStatus mockNodeStatus = createMockNodeStatus();
     
     // Register node1
     String host1 = "host1";
     org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = 
       registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, 
-          Resources.createResource(memory, vcores));
+          Resources.createResource(memory, vcores), mockNodeStatus);
     
     // Register node2
     String host2 = "host2";
     org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm2 = 
       registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK, 
-          Resources.createResource(memory/2, vcores/2));
+          Resources.createResource(memory/2, vcores/2), mockNodeStatus);
 
     // nodes should be in RUNNING state
     RMNodeImpl node1 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get(
         nm1.getNodeId());
     RMNodeImpl node2 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get(
         nm2.getNodeId());
-    node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null));
-    node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null));
+    node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null,
+        mockNodeStatus));
+    node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null,
+        mockNodeStatus));
 
     // Submit an application
     Application application = new Application("user1", resourceManager);
@@ -210,9 +217,10 @@ public class TestResourceManager {
   public void testNodeHealthReportIsNotNull() throws Exception{
     String host1 = "host1";
     final int memory = 4 * 1024;
+    NodeStatus mockNodeStatus = createMockNodeStatus();
     org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 = 
       registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, 
-          Resources.createResource(memory, 1));
+          Resources.createResource(memory, 1), mockNodeStatus);
     nm1.heartbeat();
     nm1.heartbeat();
     Collection<RMNode> values = resourceManager.getRMContext().getRMNodes().values();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index 698546d..94d440a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
 import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -2682,10 +2684,14 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
         RegisterNodeManagerRequest.class);
     NodeId nodeId = NodeId.newInstance("host2", 1234);
     Resource capability = BuilderUtils.newResource(1024, 1);
+
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
     req.setResource(capability);
     req.setNodeId(nodeId);
     req.setHttpPort(1234);
     req.setNMVersion(YarnVersionInfo.getVersion());
+    req.setNodeStatus(mockNodeStatus);
     ContainerId c1 = ContainerId.newContainerId(appAttemptId, 1);
     ContainerId c2 = ContainerId.newContainerId(appAttemptId, 2);
     ContainerId c3 = ContainerId.newContainerId(appAttemptId, 3);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index c2bc611..a09be0c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.logaggregationstatus;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -139,13 +140,15 @@ public class TestRMAppLogAggregationStatus {
     Resource capability = Resource.newInstance(4096, 4);
     RMNodeImpl node1 =
         new RMNodeImpl(nodeId1, rmContext, null, 0, 0, null, capability, null);
-    node1.handle(new RMNodeStartedEvent(nodeId1, null, null));
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+    node1.handle(new RMNodeStartedEvent(nodeId1, null, null, mockNodeStatus));
     rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId1));
 
     NodeId nodeId2 = NodeId.newInstance("localhost", 2345);
     RMNodeImpl node2 =
         new RMNodeImpl(nodeId2, rmContext, null, 0, 0, null, capability, null);
-    node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null));
+    node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null,
+        mockNodeStatus));
     rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId2));
 
     // The initial log aggregation status for these two nodes
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
index f69faf4..017a1e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
+
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.junit.Assert;
 
 import org.slf4j.Logger;
@@ -135,12 +138,15 @@ public class TestNMExpiry {
     String hostname3 = "localhost3";
     Resource capability = BuilderUtils.newResource(1024, 1);
 
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
     RegisterNodeManagerRequest request1 = recordFactory
         .newRecordInstance(RegisterNodeManagerRequest.class);
     NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
     request1.setNodeId(nodeId1);
     request1.setHttpPort(0);
     request1.setResource(capability);
+    request1.setNodeStatus(mockNodeStatus);
     resourceTrackerService.registerNodeManager(request1);
 
     RegisterNodeManagerRequest request2 = recordFactory
@@ -149,6 +155,7 @@ public class TestNMExpiry {
     request2.setNodeId(nodeId2);
     request2.setHttpPort(0);
     request2.setResource(capability);
+    request2.setNodeStatus(mockNodeStatus);
     resourceTrackerService.registerNodeManager(request2);
     
     int waitCount = 0;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
index 3c4e6b4..817fb9d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
@@ -178,9 +181,13 @@ public class TestNMReconnect extends ParameterizedSchedulerTestBase {
     RegisterNodeManagerRequest request1 = recordFactory
         .newRecordInstance(RegisterNodeManagerRequest.class);
     NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
+
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
     request1.setNodeId(nodeId1);
     request1.setHttpPort(0);
     request1.setResource(capability);
+    request1.setNodeStatus(mockNodeStatus);
     resourceTrackerService.registerNodeManager(request1);
     Assert.assertNotNull(context.getRMNodes().get(nodeId1));
     // verify Scheduler and RMContext use same RMNode reference.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
index ba409b1..e66d539 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
@@ -913,9 +915,13 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
       RegisterNodeManagerRequest request1 =
           recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
       NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
+
+      NodeStatus mockNodeStatus = createMockNodeStatus();
+
       request1.setNodeId(nodeId1);
       request1.setHttpPort(0);
       request1.setResource(capability);
+      request1.setNodeStatus(mockNodeStatus);
       privateResourceTrackerService.registerNodeManager(request1);
       privateDispatcher.await();
       Resource clusterResource =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java
index 83a354d..a75be77 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.Application;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@@ -43,6 +44,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
 import static org.junit.Assume.assumeTrue;
 
 public class TestSchedulerHealth {
@@ -170,11 +172,11 @@ public class TestSchedulerHealth {
   }
 
   private NodeManager registerNode(String hostName, int containerManagerPort,
-      int httpPort, String rackName, Resource capability) throws IOException,
-      YarnException {
+      int httpPort, String rackName, Resource capability, NodeStatus nodeStatus)
+      throws IOException, YarnException {
     NodeManager nm =
         new NodeManager(hostName, containerManagerPort, httpPort, rackName,
-          capability, resourceManager);
+          capability, resourceManager, nodeStatus);
     NodeAddedSchedulerEvent nodeAddEvent1 =
         new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
           .get(nm.getNodeId()));
@@ -200,11 +202,13 @@ public class TestSchedulerHealth {
     assumeTrue("This test is only supported on Capacity Scheduler",
       isCapacityScheduler);
 
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
     // Register node1
     String host_0 = "host_0";
     NodeManager nm_0 =
         registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-          Resources.createResource(5 * 1024, 1));
+          Resources.createResource(5 * 1024, 1), mockNodeStatus);
 
     // ResourceRequest priorities
     Priority priority_0 = Priority.newInstance(0);
@@ -275,15 +279,17 @@ public class TestSchedulerHealth {
     assumeTrue("This test is only supported on Capacity Scheduler",
       isCapacityScheduler);
 
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
     // Register nodes
     String host_0 = "host_0";
     NodeManager nm_0 =
         registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-          Resources.createResource(2 * 1024, 1));
+          Resources.createResource(2 * 1024, 1), mockNodeStatus);
     String host_1 = "host_1";
     NodeManager nm_1 =
         registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-          Resources.createResource(5 * 1024, 1));
+          Resources.createResource(5 * 1024, 1), mockNodeStatus);
     nodeUpdate(nm_0);
     nodeUpdate(nm_1);
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index b603948..3b2a557 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
 import static org.junit.Assert.assertEquals;
@@ -43,6 +44,7 @@ import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 
 import com.google.common.collect.Sets;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -227,9 +229,10 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
 
   private NodeManager registerNode(ResourceManager rm, String hostName,
       int containerManagerPort, int httpPort, String rackName,
-          Resource capability) throws IOException, YarnException {
+      Resource capability, NodeStatus nodeStatus)
+      throws IOException, YarnException {
     NodeManager nm = new NodeManager(hostName,
-        containerManagerPort, httpPort, rackName, capability, rm);
+        containerManagerPort, httpPort, rackName, capability, rm, nodeStatus);
     NodeAddedSchedulerEvent nodeAddEvent1 =
         new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes()
             .get(nm.getNodeId()));
@@ -272,10 +275,10 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
 
   private NodeManager registerNode(String hostName, int containerManagerPort,
                                    int httpPort, String rackName,
-                                   Resource capability)
+                                   Resource capability, NodeStatus nodeStatus)
           throws IOException, YarnException {
     NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort,
-        rackName, capability, resourceManager);
+        rackName, capability, resourceManager, nodeStatus);
     NodeAddedSchedulerEvent nodeAddEvent1 =
         new NodeAddedSchedulerEvent(resourceManager.getRMContext()
             .getRMNodes().get(nm.getNodeId()));
@@ -288,17 +291,19 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
 
     LOG.info("--- START: testCapacityScheduler ---");
 
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
     // Register node1
     String host_0 = "host_0";
     NodeManager nm_0 =
         registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-            Resources.createResource(4 * GB, 1));
+            Resources.createResource(4 * GB, 1), mockNodeStatus);
 
     // Register node2
     String host_1 = "host_1";
     NodeManager nm_1 =
         registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-            Resources.createResource(2 * GB, 1));
+            Resources.createResource(2 * GB, 1), mockNodeStatus);
 
     // ResourceRequest priorities
     Priority priority_0 = Priority.newInstance(0);
@@ -428,11 +433,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
     when(mC.getConfigurationProvider()).thenReturn(
         new LocalConfigurationProvider());
 
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
     // Register node1
     String host0 = "host_0";
     NodeManager nm0 =
         registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-            Resources.createResource(10 * GB, 10));
+        Resources.createResource(10 * GB, 10), mockNodeStatus);
 
     // ResourceRequest priorities
     Priority priority0 = Priority.newInstance(0);
@@ -530,11 +537,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
     when(mC.getConfigurationProvider()).thenReturn(
             new LocalConfigurationProvider());
 
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
     // Register node1
     String host0 = "host_0";
     NodeManager nm0 =
         registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-            Resources.createResource(10 * GB, 10));
+        Resources.createResource(10 * GB, 10), mockNodeStatus);
 
     // ResourceRequest priorities
     Priority priority0 = Priority.newInstance(0);
@@ -1948,17 +1957,20 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
   public void testMoveAppForMoveToQueueWithFreeCap() throws Exception {
 
     ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
     // Register node1
     String host_0 = "host_0";
     NodeManager nm_0 =
         registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-            Resources.createResource(4 * GB, 1));
+        Resources.createResource(4 * GB, 1), mockNodeStatus);
 
     // Register node2
     String host_1 = "host_1";
     NodeManager nm_1 =
         registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-            Resources.createResource(2 * GB, 1));
+        Resources.createResource(2 * GB, 1), mockNodeStatus);
 
     // ResourceRequest priorities
     Priority priority_0 = Priority.newInstance(0);
@@ -2064,17 +2076,19 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
 
     ResourceScheduler scheduler = resourceManager.getResourceScheduler();
 
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
     // Register node1
     String host_0 = "host_0";
     NodeManager nm_0 =
         registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-            Resources.createResource(5 * GB, 1));
+        Resources.createResource(5 * GB, 1), mockNodeStatus);
 
     // Register node2
     String host_1 = "host_1";
     NodeManager nm_1 =
         registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-            Resources.createResource(5 * GB, 1));
+        Resources.createResource(5 * GB, 1), mockNodeStatus);
 
     // ResourceRequest priorities
     Priority priority_0 = Priority.newInstance(0);
@@ -2186,11 +2200,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
 
     ResourceScheduler scheduler = resourceManager.getResourceScheduler();
 
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
     // Register node1
     String host_0 = "host_0";
     NodeManager nm_0 =
         registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-            Resources.createResource(6 * GB, 1));
+        Resources.createResource(6 * GB, 1), mockNodeStatus);
 
     // ResourceRequest priorities
     Priority priority_0 = Priority.newInstance(0);
@@ -2234,17 +2250,19 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
   public void testMoveAppQueueMetricsCheck() throws Exception {
     ResourceScheduler scheduler = resourceManager.getResourceScheduler();
 
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
     // Register node1
     String host_0 = "host_0";
     NodeManager nm_0 =
         registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-            Resources.createResource(5 * GB, 1));
+        Resources.createResource(5 * GB, 1), mockNodeStatus);
 
     // Register node2
     String host_1 = "host_1";
     NodeManager nm_1 =
         registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-            Resources.createResource(5 * GB, 1));
+        Resources.createResource(5 * GB, 1), mockNodeStatus);
 
     // ResourceRequest priorities
     Priority priority_0 = Priority.newInstance(0);
@@ -4126,9 +4144,12 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
   }
   @Test
   public void testRemovedNodeDecomissioningNode() throws Exception {
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
     // Register nodemanager
     NodeManager nm = registerNode("host_decom", 1234, 2345,
-        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
+        mockNodeStatus);
 
     RMNode node =
         resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
@@ -4171,10 +4192,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
     ((CapacityScheduler) resourceManager.getResourceScheduler())
         .setRMContext(spyContext);
     ((AsyncDispatcher) mockDispatcher).start();
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
     // Register node
     String host_0 = "host_0";
     NodeManager nm_0 = registerNode(host_0, 1234, 2345,
-        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
+        mockNodeStatus);
     // ResourceRequest priorities
     Priority priority_0 = Priority.newInstance(0);
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 1e24650..660b15f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -80,6 +81,7 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -5101,9 +5103,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
   @Test
   public void testRemovedNodeDecomissioningNode() throws Exception {
+    NodeStatus mockNodeStatus = createMockNodeStatus();
     // Register nodemanager
     NodeManager nm = registerNode("host_decom", 1234, 2345,
-        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
+        mockNodeStatus);
 
     RMNode node =
         resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
@@ -5146,10 +5150,13 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     ((FairScheduler) resourceManager.getResourceScheduler())
         .setRMContext(spyContext);
     ((AsyncDispatcher) mockDispatcher).start();
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
     // Register node
     String host_0 = "host_0";
     NodeManager nm_0 = registerNode(host_0, 1234, 2345,
-        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
+        mockNodeStatus);
 
     RMNode node =
         resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId());
@@ -5189,10 +5196,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
   private NodeManager registerNode(String hostName, int containerManagerPort,
                                    int httpPort, String rackName,
-                                   Resource capability)
+                                   Resource capability, NodeStatus nodeStatus)
       throws IOException, YarnException {
     NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort,
-        rackName, capability, resourceManager);
+        rackName, capability, resourceManager, nodeStatus);
 
     // after YARN-5375, scheduler event is processed in rm main dispatcher,
     // wait it processed, or may lead dead lock
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
index f9093c1..d34e8b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -32,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.event.Level;
@@ -141,10 +143,10 @@ public class TestFifoScheduler {
   
   private NodeManager registerNode(String hostName, int containerManagerPort,
                                    int nmHttpPort, String rackName,
-                                   Resource capability)
+                                   Resource capability, NodeStatus nodeStatus)
       throws IOException, YarnException {
     NodeManager nm = new NodeManager(hostName, containerManagerPort,
-        nmHttpPort, rackName, capability, resourceManager);
+        nmHttpPort, rackName, capability, resourceManager, nodeStatus);
     NodeAddedSchedulerEvent nodeAddEvent1 =
         new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
             .get(nm.getNodeId()));
@@ -403,19 +405,21 @@ public class TestFifoScheduler {
     LOG.info("--- START: testFifoScheduler ---");
         
     final int GB = 1024;
-    
+
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
     // Register node1
     String host_0 = "host_0";
     org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = 
       registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, 
-          Resources.createResource(4 * GB, 1));
+      Resources.createResource(4 * GB, 1), mockNodeStatus);
     nm_0.heartbeat();
     
     // Register node2
     String host_1 = "host_1";
     org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = 
       registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, 
-          Resources.createResource(2 * GB, 1));
+      Resources.createResource(2 * GB, 1), mockNodeStatus);
     nm_1.heartbeat();
 
     // ResourceRequest priorities
@@ -1194,9 +1198,12 @@ public class TestFifoScheduler {
 
   @Test
   public void testRemovedNodeDecomissioningNode() throws Exception {
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
     // Register nodemanager
     NodeManager nm = registerNode("host_decom", 1234, 2345,
-        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
+        mockNodeStatus);
 
     RMNode node =
         resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
@@ -1239,10 +1246,13 @@ public class TestFifoScheduler {
     ((FifoScheduler) resourceManager.getResourceScheduler())
         .setRMContext(spyContext);
     ((AsyncDispatcher) mockDispatcher).start();
+    NodeStatus mockNodeStatus = createMockNodeStatus();
+
     // Register node
     String host_0 = "host_0";
     NodeManager nm_0 = registerNode(host_0, 1234, 2345,
-        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
+        mockNodeStatus);
     // ResourceRequest priorities
     Priority priority_0 = Priority.newInstance(0);
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
index f7d470d..33345a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.webapp;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
 import static org.apache.hadoop.yarn.webapp.WebServicesTestUtils.assertResponseStatusCode;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -191,8 +192,10 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
   }
 
   private void sendStartedEvent(RMNode node) {
+    NodeStatus mockNodeStatus = createMockNodeStatus();
     ((RMNodeImpl) node)
-        .handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
+        .handle(new RMNodeStartedEvent(node.getNodeID(), null, null,
+        mockNodeStatus));
   }
 
   private void sendLostEvent(RMNode node) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org