You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aa...@apache.org on 2022/05/13 16:22:16 UTC

[hadoop] branch branch-2.10 updated: YARN-11125. Backport YARN-6483 to branch-2.10 (#4258)

This is an automated email from the ASF dual-hosted git repository.

aajisaka pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new ec5f3e4ed16 YARN-11125. Backport YARN-6483 to branch-2.10 (#4258)
ec5f3e4ed16 is described below

commit ec5f3e4ed16c4aedf06fcbf0eda28e7bcfe8d22e
Author: Ashutosh Gupta <as...@st.niituniversity.in>
AuthorDate: Fri May 13 17:21:37 2022 +0100

    YARN-11125. Backport YARN-6483 to branch-2.10 (#4258)
    
    Co-authored-by: Ashutosh Gupta <as...@amazon.com>
    Signed-off-by: Akira Ajisaka <aa...@apache.org>
---
 .../apache/hadoop/yarn/api/records/NodeReport.java |  47 +++++--
 .../hadoop/yarn/api/records/NodeUpdateType.java}   |  11 +-
 .../src/main/proto/yarn_protos.proto               |   8 ++
 .../hadoop/yarn/client/ProtocolHATestBase.java     |  50 ++++----
 .../apache/hadoop/yarn/client/cli/TestYarnCLI.java |  40 +++---
 .../yarn/api/records/impl/pb/NodeReportPBImpl.java |  50 +++++++-
 .../yarn/api/records/impl/pb/ProtoUtils.java       |  12 ++
 .../hadoop/yarn/server/utils/BuilderUtils.java     |  14 ++-
 .../server/resourcemanager/ClientRMService.java    |   3 +-
 .../DecommissioningNodesWatcher.java               |  38 +-----
 .../resourcemanager/DefaultAMSProcessor.java       |  12 +-
 .../server/resourcemanager/NodesListManager.java   |  84 +++++++++----
 .../resourcemanager/NodesListManagerEventType.java |   3 +-
 .../yarn/server/resourcemanager/rmapp/RMApp.java   |  10 +-
 .../server/resourcemanager/rmapp/RMAppImpl.java    |  11 +-
 .../rmapp/RMAppNodeUpdateEvent.java                |   9 +-
 .../yarn/server/resourcemanager/rmnode/RMNode.java |   2 +-
 .../server/resourcemanager/rmnode/RMNodeImpl.java  |   5 +
 .../hadoop/yarn/server/resourcemanager/MockRM.java |  21 +++-
 .../resourcemanager/TestClientRMService.java       |  89 ++++++++++---
 .../TestDecommissioningNodesWatcher.java           |  11 +-
 .../resourcemanager/TestRMNodeTransitions.java     |  49 ++++----
 .../TestResourceTrackerService.java                | 139 ++++++++++++++++++---
 .../applicationsmanager/MockAsm.java               |   4 +-
 .../TestAMRMRPCNodeUpdates.java                    | 107 +++++++++++-----
 .../server/resourcemanager/rmapp/MockRMApp.java    |   4 +-
 26 files changed, 598 insertions(+), 235 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java
index 885a3b4b35a..3a80641bb6d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java
@@ -53,7 +53,8 @@ public abstract class NodeReport {
       String httpAddress, String rackName, Resource used, Resource capability,
       int numContainers, String healthReport, long lastHealthReportTime) {
     return newInstance(nodeId, nodeState, httpAddress, rackName, used,
-        capability, numContainers, healthReport, lastHealthReportTime, null);
+        capability, numContainers, healthReport, lastHealthReportTime,
+        null, null, null);
   }
 
   @Private
@@ -61,7 +62,8 @@ public abstract class NodeReport {
   public static NodeReport newInstance(NodeId nodeId, NodeState nodeState,
       String httpAddress, String rackName, Resource used, Resource capability,
       int numContainers, String healthReport, long lastHealthReportTime,
-      Set<String> nodeLabels) {
+      Set<String> nodeLabels, Integer decommissioningTimeout,
+      NodeUpdateType nodeUpdateType) {
     NodeReport nodeReport = Records.newRecord(NodeReport.class);
     nodeReport.setNodeId(nodeId);
     nodeReport.setNodeState(nodeState);
@@ -73,6 +75,8 @@ public abstract class NodeReport {
     nodeReport.setHealthReport(healthReport);
     nodeReport.setLastHealthReportTime(lastHealthReportTime);
     nodeReport.setNodeLabels(nodeLabels);
+    nodeReport.setDecommissioningTimeout(decommissioningTimeout);
+    nodeReport.setNodeUpdateType(nodeUpdateType);
     return nodeReport;
   }
 
@@ -186,8 +190,8 @@ public abstract class NodeReport {
   public abstract void setLastHealthReportTime(long lastHealthReport);
   
   /**
-   * Get labels of this node
-   * @return labels of this node
+   * Get labels of this node.
+   * @return labels of this node.
    */
   @Public
   @Stable
@@ -198,8 +202,8 @@ public abstract class NodeReport {
   public abstract void setNodeLabels(Set<String> nodeLabels);
 
   /**
-   * Get containers aggregated resource utilization in a node
-   * @return containers resource utilization
+   * Get containers aggregated resource utilization in a node.
+   * @return containers resource utilization.
    */
   @Public
   @Stable
@@ -217,8 +221,8 @@ public abstract class NodeReport {
   }
 
   /**
-   * Get node resource utilization
-   * @return node resource utilization
+   * Get node resource utilization.
+   * @return node resource utilization.
    */
   @Public
   @Stable
@@ -227,4 +231,31 @@ public abstract class NodeReport {
   @Private
   @Unstable
   public abstract void setNodeUtilization(ResourceUtilization nodeUtilization);
+
+  /**
+   * Optional decommissioning timeout in seconds (null indicates absent
+   * timeout).
+   * @return the decommissioning timeout in second.
+   */
+  public Integer getDecommissioningTimeout() {
+    return null;
+  }
+
+  /**
+   * Set the decommissioning timeout in seconds (null indicates absent timeout).
+   * */
+  public void setDecommissioningTimeout(Integer decommissioningTimeout) {}
+
+  /**
+   * Optional node update type (null indicates absent update type).
+   * @return the node update.
+   */
+  public NodeUpdateType getNodeUpdateType() {
+    return NodeUpdateType.NODE_UNUSABLE;
+  }
+
+  /**
+   * Set the node update type (null indicates absent node update type).
+   * */
+  public void setNodeUpdateType(NodeUpdateType nodeUpdateType) {}
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeUpdateType.java
similarity index 78%
copy from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java
copy to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeUpdateType.java
index 2afc8e6e4a7..9152a6a3d9a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeUpdateType.java
@@ -16,9 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.server.resourcemanager;
+package org.apache.hadoop.yarn.api.records;
 
-public enum NodesListManagerEventType {
+/**
+ * <p>Taxonomy of the <code>NodeState</code> that a
+ * <code>Node</code> might transition into.</p>
+ * */
+public enum NodeUpdateType {
   NODE_USABLE,
-  NODE_UNUSABLE
+  NODE_UNUSABLE,
+  NODE_DECOMMISSIONING
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 214a2b812f0..5b2d51c5bfb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -325,6 +325,12 @@ message NodeIdProto {
   optional int32 port = 2;
 }
 
+enum NodeUpdateTypeProto {
+  NODE_USABLE = 0;
+  NODE_UNUSABLE = 1;
+  NODE_DECOMMISSIONING = 2;
+}
+
 message NodeReportProto {
   optional NodeIdProto nodeId = 1;
   optional string httpAddress = 2;
@@ -338,6 +344,8 @@ message NodeReportProto {
   repeated string node_labels = 10;
   optional ResourceUtilizationProto containers_utilization = 11;
   optional ResourceUtilizationProto node_utilization = 12;
+  optional uint32 decommissioning_timeout = 13;
+  optional NodeUpdateTypeProto node_update_type = 14;
 }
 
 message NodeIdToLabelsProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
index 9bfd606cfdf..c56fa1a8916 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
@@ -18,38 +18,30 @@
 
 package org.apache.hadoop.yarn.client;
 
-import com.google.common.base.Supplier;
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.CollectorInfo;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
+import com.google.common.base.Supplier;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.ClientBaseWithFixes;
 import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
@@ -78,14 +70,18 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerReport;
@@ -117,6 +113,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
 import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
@@ -130,8 +127,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSe
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.Records;
-import org.junit.After;
-import org.junit.Before;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 
 /**
@@ -628,7 +628,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
     }
 
     public ApplicationReport createFakeAppReport() {
-      ApplicationId appId = ApplicationId.newInstance(1000l, 1);
+      ApplicationId appId = ApplicationId.newInstance(1000L, 1);
       ApplicationAttemptId attemptId =
           ApplicationAttemptId.newInstance(appId, 1);
       // create a fake application report
@@ -648,7 +648,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
     }
 
     public ApplicationId createFakeAppId() {
-      return ApplicationId.newInstance(1000l, 1);
+      return ApplicationId.newInstance(1000L, 1);
     }
 
     public ApplicationAttemptId createFakeApplicationAttemptId() {
@@ -667,7 +667,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
       NodeId nodeId = NodeId.newInstance("localhost", 0);
       NodeReport report =
           NodeReport.newInstance(nodeId, NodeState.RUNNING, "localhost",
-              "rack1", null, null, 4, null, 1000l, null);
+              "rack1", null, null, 4, null, 1000L);
       List<NodeReport> reports = new ArrayList<NodeReport>();
       reports.add(report);
       return reports;
@@ -691,8 +691,8 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
     public ApplicationAttemptReport createFakeApplicationAttemptReport() {
       return ApplicationAttemptReport.newInstance(
           createFakeApplicationAttemptId(), "localhost", 0, "", "", "",
-          YarnApplicationAttemptState.RUNNING, createFakeContainerId(), 1000l,
-          1200l);
+          YarnApplicationAttemptState.RUNNING, createFakeContainerId(), 1000L,
+          1200L);
     }
 
     public List<ApplicationAttemptReport>
@@ -705,7 +705,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
 
     public ContainerReport createFakeContainerReport() {
       return ContainerReport.newInstance(createFakeContainerId(), null,
-          NodeId.newInstance("localhost", 0), null, 1000l, 1200l, "", "", 0,
+          NodeId.newInstance("localhost", 0), null, 1000L, 1200L, "", "", 0,
           ContainerState.COMPLETE,
           "http://" + NodeId.newInstance("localhost", 0).toString());
     }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
index bc21fdeca4b..1b2c13979b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
@@ -17,18 +17,6 @@
  */
 package org.apache.hadoop.yarn.client.cli;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
@@ -46,6 +34,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
 import org.apache.commons.cli.Options;
 import org.apache.commons.lang.time.DateFormatUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -88,14 +83,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.Times;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mortbay.log.Log;
 
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
 public class TestYarnCLI {
 
@@ -2072,7 +2072,7 @@ public class TestYarnCLI {
       NodeReport nodeReport = NodeReport.newInstance(NodeId
         .newInstance("host" + i, 0), state, "host" + 1 + ":8888",
           "rack1", Records.newRecord(Resource.class), Records
-              .newRecord(Resource.class), 0, "", 0, nodeLabels);
+              .newRecord(Resource.class), 0, "", 0, nodeLabels, null, null);
       if (!emptyResourceUtilization) {
         ResourceUtilization containersUtilization = ResourceUtilization
             .newInstance(1024, 2048, 4);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java
index 0d205e90da1..ced588d30e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
@@ -150,8 +151,9 @@ public class NodeReportPBImpl extends NodeReport {
   @Override
   public void setNodeId(NodeId nodeId) {
     maybeInitBuilder();
-    if (nodeId == null)
+    if (nodeId == null) {
       builder.clearNodeId();
+    }
     this.nodeId = nodeId;
   }
   
@@ -177,8 +179,9 @@ public class NodeReportPBImpl extends NodeReport {
   @Override
   public void setCapability(Resource capability) {
     maybeInitBuilder();
-    if (capability == null)
+    if (capability == null) {
       builder.clearCapability();
+    }
     this.capability = capability;
   }
 
@@ -215,8 +218,9 @@ public class NodeReportPBImpl extends NodeReport {
   @Override
   public void setUsed(Resource used) {
     maybeInitBuilder();
-    if (used == null)
+    if (used == null) {
       builder.clearUsed();
+    }
     this.used = used;
   }
 
@@ -234,8 +238,9 @@ public class NodeReportPBImpl extends NodeReport {
 
   @Override
   public boolean equals(Object other) {
-    if (other == null)
+    if (other == null) {
       return false;
+    }
     if (other.getClass().isAssignableFrom(this.getClass())) {
       return this.getProto().equals(this.getClass().cast(other).getProto());
     }
@@ -278,8 +283,9 @@ public class NodeReportPBImpl extends NodeReport {
   }
 
   private void mergeLocalToProto() {
-    if (viaProto)
+    if (viaProto) {
       maybeInitBuilder();
+    }
     mergeLocalToBuilder();
     proto = builder.build();
     viaProto = true;
@@ -387,4 +393,38 @@ public class NodeReportPBImpl extends NodeReport {
     }
     this.nodeUtilization = nodeResourceUtilization;
   }
+
+  @Override
+  public Integer getDecommissioningTimeout() {
+    NodeReportProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasDecommissioningTimeout())
+        ? p.getDecommissioningTimeout() : null;
+  }
+
+  @Override
+  public void setDecommissioningTimeout(Integer decommissioningTimeout) {
+    maybeInitBuilder();
+    if (decommissioningTimeout == null || decommissioningTimeout < 0) {
+      builder.clearDecommissioningTimeout();
+      return;
+    }
+    builder.setDecommissioningTimeout(decommissioningTimeout);
+  }
+
+  @Override
+  public NodeUpdateType getNodeUpdateType() {
+    NodeReportProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasNodeUpdateType()) ?
+        ProtoUtils.convertFromProtoFormat(p.getNodeUpdateType()) : null;
+  }
+
+  @Override
+  public void setNodeUpdateType(NodeUpdateType nodeUpdateType) {
+    maybeInitBuilder();
+    if (nodeUpdateType == null) {
+      builder.clearNodeUpdateType();
+      return;
+    }
+    builder.setNodeUpdateType(ProtoUtils.convertToProtoFormat(nodeUpdateType));
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 8176068984f..e9c89c536ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
@@ -80,6 +81,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceTypesProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeUpdateTypeProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateTypeProto;
 import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -356,6 +358,16 @@ public class ProtoUtils {
     return ContainerRetryPolicy.valueOf(e.name());
   }
 
+  /*
+  * NodeUpdateType
+  */
+  public static NodeUpdateTypeProto convertToProtoFormat(NodeUpdateType e) {
+    return NodeUpdateTypeProto.valueOf(e.name());
+  }
+  public static NodeUpdateType convertFromProtoFormat(NodeUpdateTypeProto e) {
+    return NodeUpdateType.valueOf(e.name());
+  }
+
   /*
    * ExecutionType
    */
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index cc3bf38046d..b6145c99ef0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -187,23 +188,26 @@ public class BuilderUtils {
       String httpAddress, String rackName, Resource used, Resource capability,
       int numContainers, String healthReport, long lastHealthReportTime) {
     return newNodeReport(nodeId, nodeState, httpAddress, rackName, used,
-        capability, numContainers, healthReport, lastHealthReportTime, null);
+        capability, numContainers, healthReport, lastHealthReportTime,
+        null, null, null);
   }
 
   public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
       String httpAddress, String rackName, Resource used, Resource capability,
       int numContainers, String healthReport, long lastHealthReportTime,
-      Set<String> nodeLabels) {
+      Set<String> nodeLabels, Integer decommissioningTimeout,
+      NodeUpdateType nodeUpdateType) {
     return newNodeReport(nodeId, nodeState, httpAddress, rackName, used,
         capability, numContainers, healthReport, lastHealthReportTime,
-        nodeLabels, null, null);
+        nodeLabels, null, null, decommissioningTimeout, nodeUpdateType);
   }
 
   public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
       String httpAddress, String rackName, Resource used, Resource capability,
       int numContainers, String healthReport, long lastHealthReportTime,
       Set<String> nodeLabels, ResourceUtilization containersUtilization,
-      ResourceUtilization nodeUtilization) {
+      ResourceUtilization nodeUtilization, Integer decommissioningTimeout,
+      NodeUpdateType nodeUpdateType) {
     NodeReport nodeReport = recordFactory.newRecordInstance(NodeReport.class);
     nodeReport.setNodeId(nodeId);
     nodeReport.setNodeState(nodeState);
@@ -217,6 +221,8 @@ public class BuilderUtils {
     nodeReport.setNodeLabels(nodeLabels);
     nodeReport.setAggregatedContainersUtilization(containersUtilization);
     nodeReport.setNodeUtilization(nodeUtilization);
+    nodeReport.setDecommissioningTimeout(decommissioningTimeout);
+    nodeReport.setNodeUpdateType(nodeUpdateType);
     return nodeReport;
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index 98322b1a292..b01d606d151 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -1101,7 +1101,8 @@ public class ClientRMService extends AbstractService implements
             rmNode.getTotalCapability(), numContainers,
             rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
             rmNode.getNodeLabels(), rmNode.getAggregatedContainersUtilization(),
-            rmNode.getNodeUtilization());
+            rmNode.getNodeUtilization(), rmNode.getDecommissioningTimeout(),
+            null);
 
     return report;
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
index 9631803e3fd..ca3eb798414 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
@@ -72,11 +72,6 @@ public class DecommissioningNodesWatcher {
 
   private final RMContext rmContext;
 
-  // Default timeout value in mills.
-  // Negative value indicates no timeout. 0 means immediate.
-  private long defaultTimeoutMs =
-      1000L * YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT;
-
   // Once a RMNode is observed in DECOMMISSIONING state,
   // All its ContainerStatus update are tracked inside DecomNodeContext.
   class DecommissioningNodeContext {
@@ -105,16 +100,15 @@ public class DecommissioningNodesWatcher {
 
     private long lastUpdateTime;
 
-    public DecommissioningNodeContext(NodeId nodeId) {
+    public DecommissioningNodeContext(NodeId nodeId, int timeoutSec) {
       this.nodeId = nodeId;
       this.appIds = new HashSet<ApplicationId>();
       this.decommissioningStartTime = mclock.getTime();
-      this.timeoutMs = defaultTimeoutMs;
+      this.timeoutMs = 1000L * timeoutSec;
     }
 
-    void updateTimeout(Integer timeoutSec) {
-      this.timeoutMs = (timeoutSec == null)?
-          defaultTimeoutMs : (1000L * timeoutSec);
+    void updateTimeout(int timeoutSec) {
+      this.timeoutMs = 1000L * timeoutSec;
     }
   }
 
@@ -132,7 +126,6 @@ public class DecommissioningNodesWatcher {
   }
 
   public void init(Configuration conf) {
-    readDecommissioningTimeout(conf);
     int v = conf.getInt(
         YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL,
         YarnConfiguration
@@ -162,7 +155,8 @@ public class DecommissioningNodesWatcher {
       }
     } else if (rmNode.getState() == NodeState.DECOMMISSIONING) {
       if (context == null) {
-        context = new DecommissioningNodeContext(rmNode.getNodeID());
+        context = new DecommissioningNodeContext(rmNode.getNodeID(),
+            rmNode.getDecommissioningTimeout());
         decomNodes.put(rmNode.getNodeID(), context);
         context.nodeState = rmNode.getState();
         context.decommissionedTime = 0;
@@ -416,24 +410,4 @@ public class DecommissioningNodesWatcher {
       LOG.debug("Decommissioning node: " + sb.toString());
     }
   }
-
-  // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml.
-  // This enables DecommissioningNodesWatcher to pick up new value
-  // without ResourceManager restart.
-  private void readDecommissioningTimeout(Configuration conf) {
-    try {
-      if (conf == null) {
-        conf = new YarnConfiguration();
-      }
-      int v = conf.getInt(
-          YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
-          YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
-      if (defaultTimeoutMs != 1000L * v) {
-        defaultTimeoutMs = 1000L * v;
-        LOG.info("Use new decommissioningTimeoutMs: " + defaultTimeoutMs);
-      }
-    } catch (Exception e) {
-      LOG.info("Error readDecommissioningTimeout ", e);
-    }
-  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
index a18b4c7e0ff..cab57f828f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.api.records.PreemptionContainer;
 import org.apache.hadoop.yarn.api.records.PreemptionContract;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
@@ -83,6 +84,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -326,10 +329,12 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
   }
 
   private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) {
-    List<RMNode> updatedNodes = new ArrayList<>();
+    Map<RMNode, NodeUpdateType> updatedNodes = new HashMap<>();
     if(app.pullRMNodeUpdates(updatedNodes) > 0) {
       List<NodeReport> updatedNodeReports = new ArrayList<>();
-      for(RMNode rmNode: updatedNodes) {
+      for(Map.Entry<RMNode, NodeUpdateType> rmNodeEntry :
+          updatedNodes.entrySet()) {
+        RMNode rmNode = rmNodeEntry.getKey();
         SchedulerNodeReport schedulerNodeReport =
             getScheduler().getNodeReport(rmNode.getNodeID());
         Resource used = BuilderUtils.newResource(0, 0);
@@ -344,7 +349,8 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
                 rmNode.getHttpAddress(), rmNode.getRackName(), used,
                 rmNode.getTotalCapability(), numContainers,
                 rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
-                rmNode.getNodeLabels());
+                rmNode.getNodeLabels(), rmNode.getDecommissioningTimeout(),
+                rmNodeEntry.getValue());
 
         updatedNodeReports.add(report);
       }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index 4c43b8a6658..a40b12c1017 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -31,6 +31,8 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,8 +42,8 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.HostsFileReader.HostDetails;
-import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -60,8 +62,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 
-import com.google.common.annotations.VisibleForTesting;
-
 @SuppressWarnings("unchecked")
 public class NodesListManager extends CompositeService implements
     EventHandler<NodesListManagerEvent> {
@@ -72,6 +72,11 @@ public class NodesListManager extends CompositeService implements
   private Configuration conf;
   private final RMContext rmContext;
 
+  // Default decommissioning timeout value in seconds.
+  // Negative value indicates no timeout. 0 means immediate.
+  private int defaultDecTimeoutSecs =
+      YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT;
+
   private String includesFile;
   private String excludesFile;
 
@@ -214,6 +219,11 @@ public class NodesListManager extends CompositeService implements
   private void refreshHostsReader(
       Configuration yarnConf, boolean graceful, Integer timeout)
           throws IOException, YarnException {
+    // resolve the default timeout to the decommission timeout that is
+    // configured at this moment
+    if (null == timeout) {
+      timeout = readDecommissioningTimeout(yarnConf);
+    }
     if (null == yarnConf) {
       yarnConf = new YarnConfiguration();
     }
@@ -252,7 +262,7 @@ public class NodesListManager extends CompositeService implements
   // Gracefully decommission excluded nodes that are not already
   // DECOMMISSIONED nor DECOMMISSIONING; Take no action for excluded nodes
   // that are already DECOMMISSIONED or DECOMMISSIONING.
-  private void handleExcludeNodeList(boolean graceful, Integer timeout) {
+  private void handleExcludeNodeList(boolean graceful, int timeout) {
     // DECOMMISSIONED/DECOMMISSIONING nodes need to be re-commissioned.
     List<RMNode> nodesToRecom = new ArrayList<RMNode>();
 
@@ -458,36 +468,40 @@ public class NodesListManager extends CompositeService implements
         && !(excludeList.contains(hostName) || excludeList.contains(ip));
   }
 
+  private void sendRMAppNodeUpdateEventToNonFinalizedApps(
+      RMNode eventNode, RMAppNodeUpdateType appNodeUpdateType) {
+    for(RMApp app : rmContext.getRMApps().values()) {
+      if (!app.isAppFinalStateStored()) {
+        this.rmContext
+            .getDispatcher()
+            .getEventHandler()
+            .handle(
+                new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
+                    appNodeUpdateType));
+      }
+    }
+  }
+
   @Override
   public void handle(NodesListManagerEvent event) {
     RMNode eventNode = event.getNode();
     switch (event.getType()) {
     case NODE_UNUSABLE:
       LOG.debug(eventNode + " reported unusable");
-      for(RMApp app: rmContext.getRMApps().values()) {
-        if (!app.isAppFinalStateStored()) {
-          this.rmContext
-              .getDispatcher()
-              .getEventHandler()
-              .handle(
-                  new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
-                      RMAppNodeUpdateType.NODE_UNUSABLE));
-        }
-      }
+      sendRMAppNodeUpdateEventToNonFinalizedApps(eventNode,
+          RMAppNodeUpdateType.NODE_UNUSABLE);
       break;
     case NODE_USABLE:
       LOG.debug(eventNode + " reported usable");
-      for (RMApp app : rmContext.getRMApps().values()) {
-        if (!app.isAppFinalStateStored()) {
-          this.rmContext
-              .getDispatcher()
-              .getEventHandler()
-              .handle(
-                  new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
-                      RMAppNodeUpdateType.NODE_USABLE));
-        }
-      }
+      sendRMAppNodeUpdateEventToNonFinalizedApps(eventNode,
+          RMAppNodeUpdateType.NODE_USABLE);
+      break;
+    case NODE_DECOMMISSIONING:
+      LOG.debug(eventNode + " reported decommissioning");
+      sendRMAppNodeUpdateEventToNonFinalizedApps(
+          eventNode, RMAppNodeUpdateType.NODE_DECOMMISSIONING);
       break;
+
     default:
       LOG.error("Ignoring invalid eventtype " + event.getType());
     }
@@ -606,6 +620,28 @@ public class NodesListManager extends CompositeService implements
     }
   }
 
+  // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml.
+  // This enables NodesListManager to pick up new value without
+  // ResourceManager restart.
+  private int readDecommissioningTimeout(Configuration pConf) {
+    try {
+      if (pConf == null) {
+        pConf = new YarnConfiguration();
+      }
+      int configuredDefaultDecTimeoutSecs =
+          pConf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
+              YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
+      if (defaultDecTimeoutSecs != configuredDefaultDecTimeoutSecs) {
+        defaultDecTimeoutSecs = configuredDefaultDecTimeoutSecs;
+        LOG.info("Use new decommissioningTimeoutSecs: "
+            + defaultDecTimeoutSecs);
+      }
+    } catch (Exception e) {
+      LOG.warn("Error readDecommissioningTimeout " + e.getMessage());
+    }
+    return defaultDecTimeoutSecs;
+  }
+
   /**
    * A NodeId instance needed upon startup for populating inactive nodes Map.
    * It only knows the hostname/ip and marks the port to -1 or invalid.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java
index 2afc8e6e4a7..db16bc4e121 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java
@@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 
 public enum NodesListManagerEventType {
   NODE_USABLE,
-  NODE_UNUSABLE
+  NODE_UNUSABLE,
+  NODE_DECOMMISSIONING
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index 936f78b75b4..5711c2bcd16 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -154,10 +154,12 @@ public interface RMApp extends EventHandler<RMAppEvent> {
    * received by the RMApp. Updates can be node becoming lost or becoming
    * healthy etc. The method clears the information from the {@link RMApp}. So
    * each call to this method gives the delta from the previous call.
-   * @param updatedNodes Collection into which the updates are transferred
-   * @return the number of nodes added to the {@link Collection}
+   * @param updatedNodes Map into which the updates are transferred, with each
+   * node updates as the key, and the {@link NodeUpdateType} for that update
+   * as the corresponding value.
+   * @return the number of nodes added to the {@link Map}
    */
-  int pullRMNodeUpdates(Collection<RMNode> updatedNodes);
+  int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> updatedNodes);
 
   /**
    * The finish time of the {@link RMApp}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 48a4ed80ccb..d2101df3a2d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -20,11 +20,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 import java.net.InetAddress;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -59,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -146,7 +145,7 @@ public class RMAppImpl implements RMApp, Recoverable {
   private final Map<ApplicationAttemptId, RMAppAttempt> attempts
       = new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
   private final long submitTime;
-  private final Set<RMNode> updatedNodes = new HashSet<RMNode>();
+  private final Map<RMNode, NodeUpdateType> updatedNodes = new HashMap<>();
   private final String applicationType;
   private final Set<String> applicationTags;
 
@@ -703,11 +702,11 @@ public class RMAppImpl implements RMApp, Recoverable {
   }
 
   @Override
-  public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
+  public int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> upNodes) {
     this.writeLock.lock();
     try {
       int updatedNodeCount = this.updatedNodes.size();
-      updatedNodes.addAll(this.updatedNodes);
+      upNodes.putAll(this.updatedNodes);
       this.updatedNodes.clear();
       return updatedNodeCount;
     } finally {
@@ -1025,7 +1024,7 @@ public class RMAppImpl implements RMApp, Recoverable {
 
   private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
     NodeState nodeState = node.getState();
-    updatedNodes.add(node);
+    updatedNodes.put(node, RMAppNodeUpdateType.convertToNodeUpdateType(type));
     LOG.debug("Received node update event:" + type + " for node:" + node
         + " with state:" + nodeState);
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java
index ba8af9801f7..245d9a6edbb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java
@@ -19,13 +19,20 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 public class RMAppNodeUpdateEvent extends RMAppEvent {
 
   public enum RMAppNodeUpdateType {
     NODE_USABLE, 
-    NODE_UNUSABLE
+    NODE_UNUSABLE,
+    NODE_DECOMMISSIONING;
+
+    public static NodeUpdateType convertToNodeUpdateType(
+        RMAppNodeUpdateType rmAppNodeUpdateType) {
+      return NodeUpdateType.valueOf(rmAppNodeUpdateType.name());
+    }
   }
 
   private final RMNode node;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index 4d603081bf9..ab6e40918fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -179,7 +179,7 @@ public interface RMNode {
   
   /**
    * Optional decommissioning timeout in second
-   * (null indicates default timeout).
+   * (null indicates absent timeout).
    * @return the decommissioning timeout in second.
    */
   Integer getDecommissioningTimeout();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 5a188d060c8..fca5115c889 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -1176,6 +1176,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       // Update NM metrics during graceful decommissioning.
       rmNode.updateMetricsForGracefulDecommission(initState, finalState);
       rmNode.decommissioningTimeout = timeout;
+      // Notify NodesListManager to notify all RMApp so that each
+      // Application Master could take any required actions.
+      rmNode.context.getDispatcher().getEventHandler().handle(
+          new NodesListManagerEvent(
+              NodesListManagerEventType.NODE_DECOMMISSIONING, rmNode));
       if (rmNode.originalTotalCapability == null){
         rmNode.originalTotalCapability =
             Resources.clone(rmNode.totalCapability);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 915c719c2d6..e7240b267c6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.Collection;
@@ -28,8 +27,10 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
-
 import java.util.Set;
+
+import org.junit.Assert;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -92,6 +93,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecommissioningEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
@@ -112,7 +114,6 @@ import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.junit.Assert;
 
 
 @SuppressWarnings("unchecked")
@@ -956,12 +957,26 @@ public class MockRM extends ResourceManager {
         node.getState());
   }
 
+  public void sendNodeGracefulDecommission(
+      MockNM nm, int timeout) throws Exception {
+    RMNodeImpl node = (RMNodeImpl)
+        getRMContext().getRMNodes().get(nm.getNodeId());
+    Assert.assertNotNull("node shouldn't be null", node);
+    node.handle(new RMNodeDecommissioningEvent(nm.getNodeId(), timeout));
+  }
+
   public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception {
     RMNodeImpl node = (RMNodeImpl)
         getRMContext().getRMNodes().get(nm.getNodeId());
+    Assert.assertNotNull("node shouldn't be null", node);
     node.handle(new RMNodeEvent(nm.getNodeId(), event));
   }
 
+  public Integer getDecommissioningTimeout(NodeId nodeid) {
+    return this.getRMContext().getRMNodes()
+        .get(nodeid).getDecommissioningTimeout();
+  }
+
   public KillApplicationResponse killApp(ApplicationId appId) throws Exception {
     ApplicationClientProtocol client = getClientRMService();
     KillApplicationRequest req = KillApplicationRequest.newInstance(appId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index 1b13acbc9bc..fe51878fa31 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -18,18 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyListOf;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -51,6 +39,13 @@ import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CyclicBarrier;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+import org.mockito.Matchers;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -153,22 +148,26 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
-
-import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.UTCClock;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Test;
 
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import org.mockito.Matchers;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 public class TestClientRMService {
 
@@ -183,6 +182,50 @@ public class TestClientRMService {
   private final static String QUEUE_2 = "Q-2";
   private final static String APPLICATION_TAG_SC_PREPROCESSOR ="mytag:foo";
 
+  @Test
+  public void testGetDecommissioningClusterNodes() throws Exception {
+    MockRM rm = new MockRM() {
+      protected ClientRMService createClientRMService() {
+        return new ClientRMService(this.rmContext, scheduler,
+            this.rmAppManager, this.applicationACLsManager,
+            this.queueACLsManager,
+            this.getRMContext().getRMDelegationTokenSecretManager());
+      };
+    };
+    rm.start();
+
+    int nodeMemory = 1024;
+    MockNM nm1 = rm.registerNode("host1:1234", nodeMemory);
+    rm.sendNodeStarted(nm1);
+    nm1.nodeHeartbeat(true);
+    rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
+    Integer decommissioningTimeout = 600;
+    rm.sendNodeGracefulDecommission(nm1, decommissioningTimeout);
+    rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING);
+
+    // Create a client.
+    Configuration conf = new Configuration();
+    YarnRPC rpc = YarnRPC.create(conf);
+    InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
+    LOG.info("Connecting to ResourceManager at " + rmAddress);
+    ApplicationClientProtocol client =
+        (ApplicationClientProtocol) rpc
+            .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
+
+    // Make call
+    List<NodeReport> nodeReports = client.getClusterNodes(
+        GetClusterNodesRequest.newInstance(
+            EnumSet.of(NodeState.DECOMMISSIONING)))
+        .getNodeReports();
+    Assert.assertEquals(1, nodeReports.size());
+    NodeReport nr = nodeReports.iterator().next();
+    Assert.assertEquals(decommissioningTimeout, nr.getDecommissioningTimeout());
+    Assert.assertNull(nr.getNodeUpdateType());
+
+    rpc.stopProxy(client, conf);
+    rm.close();
+  }
+
   @Test
   public void testGetClusterNodes() throws Exception {
     MockRM rm = new MockRM() {
@@ -231,6 +274,8 @@ public class TestClientRMService {
     
     // Check node's label = x
     Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("x"));
+    Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout());
+    Assert.assertNull(nodeReports.get(0).getNodeUpdateType());
 
     // Now make the node unhealthy.
     node.nodeHeartbeat(false);
@@ -254,6 +299,8 @@ public class TestClientRMService {
         nodeReports.get(0).getNodeState());
     
     Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("y"));
+    Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout());
+    Assert.assertNull(nodeReports.get(0).getNodeUpdateType());
     
     // Remove labels of host1
     map = new HashMap<NodeId, Set<String>>();
@@ -270,6 +317,8 @@ public class TestClientRMService {
     for (NodeReport report : nodeReports) {
       Assert.assertTrue(report.getNodeLabels() != null
           && report.getNodeLabels().isEmpty());
+      Assert.assertNull(report.getDecommissioningTimeout());
+      Assert.assertNull(report.getNodeUpdateType());
     }
 
     rpc.stopProxy(client, conf);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java
index 690de308e23..09f1482eb5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java
@@ -21,6 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -36,10 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
 
 /**
  * This class tests DecommissioningNodesWatcher.
@@ -69,7 +69,8 @@ public class TestDecommissioningNodesWatcher {
     MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
 
     // Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher.
-    rm.sendNodeEvent(nm1, RMNodeEventType.GRACEFUL_DECOMMISSION);
+    rm.sendNodeGracefulDecommission(nm1,
+        YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
     rm.waitForState(id1, NodeState.DECOMMISSIONING);
 
     // Update status with decreasing number of running containers until 0.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 272633269a2..6ecb306fb8d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -17,20 +17,21 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -48,8 +49,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
-    .AllocationExpirationInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.AllocationExpirationInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
@@ -71,13 +71,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TestRMNodeTransitions {
 
@@ -98,13 +100,16 @@ public class TestRMNodeTransitions {
   }
 
   private NodesListManagerEvent nodesListManagerEvent = null;
-  
+  private List<NodeState> nodesListManagerEventsNodeStateSequence =
+      new LinkedList<>();
+
   private class TestNodeListManagerEventDispatcher implements
       EventHandler<NodesListManagerEvent> {
     
     @Override
     public void handle(NodesListManagerEvent event) {
       nodesListManagerEvent = event;
+      nodesListManagerEventsNodeStateSequence.add(event.getNode().getState());
     }
 
   }
@@ -150,7 +155,7 @@ public class TestRMNodeTransitions {
     NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
     node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
     nodesListManagerEvent =  null;
-
+    nodesListManagerEventsNodeStateSequence.clear();
   }
   
   @After
@@ -712,6 +717,8 @@ public class TestRMNodeTransitions {
     node.handle(new RMNodeEvent(node.getNodeID(),
         RMNodeEventType.GRACEFUL_DECOMMISSION));
     Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+    Assert.assertEquals(Arrays.asList(NodeState.NEW, NodeState.RUNNING),
+        nodesListManagerEventsNodeStateSequence);
     Assert
         .assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
     Assert.assertEquals("Decommissioning Nodes", initialDecommissioning + 1,
@@ -999,7 +1006,7 @@ public class TestRMNodeTransitions {
 
     Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
     Assert.assertNotNull(nodesListManagerEvent);
-    Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
+    Assert.assertEquals(NodesListManagerEventType.NODE_DECOMMISSIONING,
         nodesListManagerEvent.getType());
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index 7c5181487df..d81ad93de96 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -18,13 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -39,7 +32,21 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.metrics2.MetricsSystem;
@@ -78,8 +85,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -96,17 +103,24 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TestResourceTrackerService extends NodeLabelTestBase {
 
   private final static File TEMP_DIR = new File(System.getProperty(
       "test.build.data", "/tmp"), "decommision");
-  private final File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
+  private final File hostFile =
+      new File(TEMP_DIR + File.separator + "hostFile.txt");
   private final File excludeHostFile = new File(TEMP_DIR + File.separator +
       "excludeHostFile.txt");
+  private final File excludeHostXmlFile =
+      new File(TEMP_DIR + File.separator + "excludeHostFile.xml");
 
   private MockRM rm;
 
@@ -291,6 +305,68 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction());
   }
 
+  @Ignore
+  @Test
+  public void testGracefulDecommissionDefaultTimeoutResolution()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostXmlFile
+        .getAbsolutePath());
+
+    writeToHostsXmlFile(excludeHostXmlFile, Pair.<String, Integer>of("", null));
+    rm = new MockRM(conf);
+    rm.start();
+
+    int nodeMemory = 1024;
+    MockNM nm1 = rm.registerNode("host1:1234", nodeMemory);
+    MockNM nm2 = rm.registerNode("host2:5678", nodeMemory);
+    MockNM nm3 = rm.registerNode("host3:9101", nodeMemory);
+
+    NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true);
+    NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true);
+    NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true);
+
+    Assert.assertTrue(
+        NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction()));
+    Assert.assertTrue(
+        NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction()));
+    Assert.assertTrue(
+        NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction()));
+
+    rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
+    rm.waitForState(nm2.getNodeId(), NodeState.RUNNING);
+    rm.waitForState(nm3.getNodeId(), NodeState.RUNNING);
+
+    // Graceful decommission both host1 and host2, with
+    // non default timeout for host1
+    final Integer nm1DecommissionTimeout = 20;
+    writeToHostsXmlFile(
+        excludeHostXmlFile,
+        Pair.of(nm1.getNodeId().getHost(), nm1DecommissionTimeout),
+        Pair.<String, Integer>of(nm2.getNodeId().getHost(), null));
+    rm.getNodesListManager().refreshNodes(conf, true);
+    rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING);
+    rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING);
+    Assert.assertEquals(
+        nm1DecommissionTimeout, rm.getDecommissioningTimeout(nm1.getNodeId()));
+    Integer defaultDecTimeout =
+        conf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
+            YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
+    Assert.assertEquals(
+        defaultDecTimeout, rm.getDecommissioningTimeout(nm2.getNodeId()));
+
+    // Graceful decommission host3 with a new default timeout
+    final Integer newDefaultDecTimeout = defaultDecTimeout + 10;
+    writeToHostsXmlFile(
+        excludeHostXmlFile, Pair.<String, Integer>of(nm3.getNodeId().getHost(), null));
+    conf.setInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
+        newDefaultDecTimeout);
+    rm.getNodesListManager().refreshNodes(conf, true);
+    rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING);
+    Assert.assertEquals(
+        newDefaultDecTimeout, rm.getDecommissioningTimeout(nm3.getNodeId()));
+  }
+
   /**
    * Graceful decommission node with running application.
    */
@@ -1965,16 +2041,20 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     rm.stop();
   }
 
+  private void ensureFileExists(File file) throws IOException {
+    if (!file.exists()) {
+      TEMP_DIR.mkdirs();
+      file.createNewFile();
+    }
+  }
+
   private void writeToHostsFile(String... hosts) throws IOException {
     writeToHostsFile(hostFile, hosts);
   }
 
   private void writeToHostsFile(File file, String... hosts)
       throws IOException {
-    if (!file.exists()) {
-      TEMP_DIR.mkdirs();
-      file.createNewFile();
-    }
+    ensureFileExists(file);
     FileOutputStream fStream = null;
     try {
       fStream = new FileOutputStream(file);
@@ -1990,6 +2070,33 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     }
   }
 
+  private void writeToHostsXmlFile(
+      File file, Pair<String, Integer>... hostsAndTimeouts) throws Exception {
+    ensureFileExists(file);
+    DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
+    Document doc = dbFactory.newDocumentBuilder().newDocument();
+    Element hosts = doc.createElement("hosts");
+    doc.appendChild(hosts);
+    for (Pair<String, Integer> hostsAndTimeout : hostsAndTimeouts) {
+      Element host = doc.createElement("host");
+      hosts.appendChild(host);
+      Element name = doc.createElement("name");
+      host.appendChild(name);
+      name.appendChild(doc.createTextNode(hostsAndTimeout.getLeft()));
+      if (hostsAndTimeout.getRight() != null) {
+        Element timeout = doc.createElement("timeout");
+        host.appendChild(timeout);
+        timeout.appendChild(
+            doc.createTextNode(hostsAndTimeout.getRight().toString())
+        );
+      }
+    }
+    TransformerFactory transformerFactory = TransformerFactory.newInstance();
+    Transformer transformer = transformerFactory.newTransformer();
+    transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+    transformer.transform(new DOMSource(doc), new StreamResult(file));
+  }
+
   private void checkDecommissionedNMCount(MockRM rm, int count)
       throws InterruptedException {
     int waitCount = 0;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index ebd8636f765..81a396f26c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -163,7 +163,7 @@ public abstract class MockAsm extends MockApps {
       throw new UnsupportedOperationException("Not supported yet.");
     }
     @Override
-    public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
+    public int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> updatedNodes) {
       throw new UnsupportedOperationException("Not supported yet.");
     }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
index f9f0b746233..095d0027009 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
@@ -21,7 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 import java.security.PrivilegedExceptionAction;
 import java.util.List;
 
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -31,6 +34,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@@ -39,9 +43,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
 
 public class TestAMRMRPCNodeUpdates {
   private MockRM rm;
@@ -53,8 +54,8 @@ public class TestAMRMRPCNodeUpdates {
       @Override
       public void init(Configuration conf) {
         conf.set(
-          CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
-          "1.0");
+            CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
+            "1.0");
         super.init(conf);
       }
     };
@@ -62,19 +63,19 @@ public class TestAMRMRPCNodeUpdates {
     rm.start();
     amService = rm.getApplicationMasterService();
   }
-  
+
   @After
   public void tearDown() {
     if (rm != null) {
       this.rm.stop();
     }
   }
-  
+
   private void syncNodeHeartbeat(MockNM nm, boolean health) throws Exception {
     nm.nodeHeartbeat(health);
     rm.drainEvents();
   }
-  
+
   private void syncNodeLost(MockNM nm) throws Exception {
     rm.sendNodeStarted(nm);
     rm.waitForState(nm.getNodeId(), NodeState.RUNNING);
@@ -82,13 +83,20 @@ public class TestAMRMRPCNodeUpdates {
     rm.drainEvents();
   }
 
+  private void syncNodeGracefulDecommission(
+      MockNM nm, int timeout) throws Exception {
+    rm.sendNodeGracefulDecommission(nm, timeout);
+    rm.waitForState(nm.getNodeId(), NodeState.DECOMMISSIONING);
+    rm.drainEvents();
+  }
+
   private AllocateResponse allocate(final ApplicationAttemptId attemptId,
       final AllocateRequest req) throws Exception {
     UserGroupInformation ugi =
         UserGroupInformation.createRemoteUser(attemptId.toString());
     Token<AMRMTokenIdentifier> token =
         rm.getRMContext().getRMApps().get(attemptId.getApplicationId())
-          .getRMAppAttempt(attemptId).getAMRMToken();
+            .getRMAppAttempt(attemptId).getAMRMToken();
     ugi.addTokenIdentifier(token.decodeIdentifier());
     return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
       @Override
@@ -98,9 +106,42 @@ public class TestAMRMRPCNodeUpdates {
     });
   }
 
+  @Test
+  public void testAMRMDecommissioningNodes() throws Exception {
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10000);
+    MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000);
+    rm.drainEvents();
+
+    RMApp app1 = rm.submitApp(2000);
+
+    // Trigger the scheduling so the AM gets 'launched' on nm1
+    nm1.nodeHeartbeat(true);
+
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+
+    // register AM returns no unusable node
+    am1.registerAppAttempt();
+
+    Integer decommissioningTimeout = 600;
+    syncNodeGracefulDecommission(nm2, decommissioningTimeout);
+
+    AllocateRequest allocateRequest1 =
+        AllocateRequest.newInstance(0, 0F, null, null, null);
+    AllocateResponse response1 =
+        allocate(attempt1.getAppAttemptId(), allocateRequest1);
+    List<NodeReport> updatedNodes = response1.getUpdatedNodes();
+    Assert.assertEquals(1, updatedNodes.size());
+    NodeReport nr = updatedNodes.iterator().next();
+    Assert.assertEquals(
+        decommissioningTimeout, nr.getDecommissioningTimeout());
+    Assert.assertEquals(
+        NodeUpdateType.NODE_DECOMMISSIONING, nr.getNodeUpdateType());
+  }
+
   @Test
   public void testAMRMUnusableNodes() throws Exception {
-    
+
     MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10000);
     MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000);
     MockNM nm3 = rm.registerNode("127.0.0.3:1234", 10000);
@@ -114,7 +155,7 @@ public class TestAMRMRPCNodeUpdates {
 
     RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
     MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
-    
+
     // register AM returns no unusable node
     am1.registerAppAttempt();
 
@@ -127,18 +168,20 @@ public class TestAMRMRPCNodeUpdates {
     Assert.assertEquals(0, updatedNodes.size());
 
     syncNodeHeartbeat(nm4, false);
-    
+
     // allocate request returns updated node
     allocateRequest1 =
         AllocateRequest.newInstance(response1.getResponseId(), 0F, null, null,
-          null);
+            null);
     response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
     updatedNodes = response1.getUpdatedNodes();
     Assert.assertEquals(1, updatedNodes.size());
     NodeReport nr = updatedNodes.iterator().next();
     Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
     Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
-    
+    Assert.assertNull(nr.getDecommissioningTimeout());
+    Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType());
+
     // resending the allocate request returns the same result
     response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
     updatedNodes = response1.getUpdatedNodes();
@@ -146,30 +189,34 @@ public class TestAMRMRPCNodeUpdates {
     nr = updatedNodes.iterator().next();
     Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
     Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
+    Assert.assertNull(nr.getDecommissioningTimeout());
+    Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType());
 
     syncNodeLost(nm3);
-    
+
     // subsequent allocate request returns delta
     allocateRequest1 =
         AllocateRequest.newInstance(response1.getResponseId(), 0F, null, null,
-          null);
+            null);
     response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
     updatedNodes = response1.getUpdatedNodes();
     Assert.assertEquals(1, updatedNodes.size());
     nr = updatedNodes.iterator().next();
     Assert.assertEquals(nm3.getNodeId(), nr.getNodeId());
     Assert.assertEquals(NodeState.LOST, nr.getNodeState());
-        
+    Assert.assertNull(nr.getDecommissioningTimeout());
+    Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType());
+
     // registering another AM gives it the complete failed list
     RMApp app2 = rm.submitApp(2000);
     // Trigger nm2 heartbeat so that AM gets launched on it
     nm2.nodeHeartbeat(true);
     RMAppAttempt attempt2 = app2.getCurrentAppAttempt();
     MockAM am2 = rm.sendAMLaunched(attempt2.getAppAttemptId());
-    
+
     // register AM returns all unusable nodes
     am2.registerAppAttempt();
-    
+
     // allocate request returns no updated node
     AllocateRequest allocateRequest2 =
         AllocateRequest.newInstance(0, 0F, null, null, null);
@@ -177,39 +224,43 @@ public class TestAMRMRPCNodeUpdates {
         allocate(attempt2.getAppAttemptId(), allocateRequest2);
     updatedNodes = response2.getUpdatedNodes();
     Assert.assertEquals(0, updatedNodes.size());
-    
+
     syncNodeHeartbeat(nm4, true);
-    
+
     // both AM's should get delta updated nodes
     allocateRequest1 =
         AllocateRequest.newInstance(response1.getResponseId(), 0F, null, null,
-          null);
+            null);
     response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
     updatedNodes = response1.getUpdatedNodes();
     Assert.assertEquals(1, updatedNodes.size());
     nr = updatedNodes.iterator().next();
     Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
     Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
-    
+    Assert.assertNull(nr.getDecommissioningTimeout());
+    Assert.assertEquals(NodeUpdateType.NODE_USABLE, nr.getNodeUpdateType());
+
     allocateRequest2 =
         AllocateRequest.newInstance(response2.getResponseId(), 0F, null, null,
-          null);
+            null);
     response2 = allocate(attempt2.getAppAttemptId(), allocateRequest2);
     updatedNodes = response2.getUpdatedNodes();
     Assert.assertEquals(1, updatedNodes.size());
     nr = updatedNodes.iterator().next();
     Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
     Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
+    Assert.assertNull(nr.getDecommissioningTimeout());
+    Assert.assertEquals(NodeUpdateType.NODE_USABLE, nr.getNodeUpdateType());
 
     // subsequent allocate calls should return no updated nodes
     allocateRequest2 =
         AllocateRequest.newInstance(response2.getResponseId(), 0F, null, null,
-          null);
+            null);
     response2 = allocate(attempt2.getAppAttemptId(), allocateRequest2);
     updatedNodes = response2.getUpdatedNodes();
     Assert.assertEquals(0, updatedNodes.size());
-    
+
     // how to do the above for LOST node
-  
+
   }
-}
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index 330c5c672ca..caa7d1e2af0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
-import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -36,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -246,7 +246,7 @@ public class MockRMApp implements RMApp {
   }
 
   @Override
-  public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
+  public int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> updatedNodes) {
     throw new UnsupportedOperationException("Not supported yet.");
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org