You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2020/11/02 17:31:34 UTC

[hadoop] branch branch-3.3 updated: YARN-10475: Scale RM-NM heartbeat interval based on node utilization. Contributed by Jim Brennan (Jim_Brennan).

This is an automated email from the ASF dual-hosted git repository.

epayne pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 2473e8b  YARN-10475: Scale RM-NM heartbeat interval based on node utilization. Contributed by Jim Brennan (Jim_Brennan).
2473e8b is described below

commit 2473e8b7114b42f20fc8ce48cdefe63b16385522
Author: Eric E Payne <ep...@apache.org>
AuthorDate: Mon Nov 2 17:16:28 2020 +0000

    YARN-10475: Scale RM-NM heartbeat interval based on node utilization. Contributed by Jim Brennan (Jim_Brennan).
---
 .../hadoop/yarn/sls/nodemanager/NodeInfo.java      |   7 ++
 .../hadoop/yarn/sls/scheduler/RMNodeWrapper.java   |   7 ++
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |  24 +++++
 .../src/main/resources/yarn-default.xml            |  50 ++++++++++
 .../yarn/server/resourcemanager/AdminService.java  |   8 ++
 .../server/resourcemanager/ClusterMetrics.java     |  27 ++++-
 .../resourcemanager/ResourceTrackerService.java    | 105 +++++++++++++++++---
 .../yarn/server/resourcemanager/rmnode/RMNode.java |   4 +
 .../server/resourcemanager/rmnode/RMNodeImpl.java  |  42 ++++++++
 .../scheduler/ClusterNodeTracker.java              |   5 +-
 .../yarn/server/resourcemanager/MockNodes.java     |   7 ++
 .../resourcemanager/TestRMNodeTransitions.java     | 109 +++++++++++++++++++++
 .../scheduler/TestClusterNodeTracker.java          |  18 ++++
 .../src/site/markdown/NodeManager.md               |  18 ++++
 14 files changed, 418 insertions(+), 13 deletions(-)

diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 4cf8aef..32567db 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -246,6 +246,13 @@ public class NodeInfo {
     @Override
     public void resetUpdatedCapability() {
     }
+
+    @Override
+    public long calculateHeartBeatInterval(
+        long defaultInterval, long minInterval, long maxInterval,
+        float speedupFactor, float slowdownFactor) {
+      return defaultInterval;
+    }
   }
 
   public static RMNode newNodeInfo(String rackName, String hostName,
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 750b708..b5ae4f5 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -231,4 +231,11 @@ public class RMNodeWrapper implements RMNode {
   @Override
   public void resetUpdatedCapability() {
   }
+
+  @Override
+  public long calculateHeartBeatInterval(
+      long defaultInterval, long minInterval, long maxInterval,
+      float speedupFactor, float slowdownFactor) {
+    return defaultInterval;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 456a9b6..26f3a62 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -690,6 +690,30 @@ public class YarnConfiguration extends Configuration {
       RM_PREFIX + "nodemanagers.heartbeat-interval-ms";
   public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS = 1000;
 
+  /** Enable Heartbeat Interval Scaling based on cpu utilization. */
+  public static final String RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE =
+      RM_PREFIX + "nodemanagers.heartbeat-interval-scaling-enable";
+  public static final boolean
+      DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE = false;
+
+  public static final String RM_NM_HEARTBEAT_INTERVAL_MIN_MS =
+      RM_PREFIX + "nodemanagers.heartbeat-interval-min-ms";
+  public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MIN_MS = 1000;
+
+  public static final String RM_NM_HEARTBEAT_INTERVAL_MAX_MS =
+      RM_PREFIX + "nodemanagers.heartbeat-interval-max-ms";
+  public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MAX_MS = 1000;
+
+  public static final String RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR =
+      RM_PREFIX + "nodemanagers.heartbeat-interval-speedup-factor";
+  public static final float
+      DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR = 1.0f;
+
+  public static final String RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR =
+      RM_PREFIX + "nodemanagers.heartbeat-interval-slowdown-factor";
+  public static final float
+      DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR = 1.0f;
+
   /** Number of worker threads that write the history data. */
   public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE =
       RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 71974c2..b2041f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -855,6 +855,56 @@
   </property>
 
   <property>
+    <description>Enables heart-beat interval scaling.  The NodeManager
+      heart-beat interval will scale based on the difference between the CPU
+      utilization on the node and the cluster-wide average CPU utilization.
+    </description>
+    <name>
+      yarn.resourcemanager.nodemanagers.heartbeat-interval-scaling-enable
+    </name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>If heart-beat interval scaling is enabled, this is the
+      minimum heart-beat interval in milliseconds
+    </description>
+    <name>yarn.resourcemanager.nodemanagers.heartbeat-interval-min-ms</name>
+    <value>1000</value>
+  </property>
+
+  <property>
+    <description>If heart-beat interval scaling is enabled, this is the
+      maximum heart-beat interval in milliseconds</description>
+    <name>yarn.resourcemanager.nodemanagers.heartbeat-interval-max-ms</name>
+    <value>1000</value>
+  </property>
+
+  <property>
+    <description>If heart-beat interval scaling is enabled, this controls
+      the degree of adjustment when speeding up heartbeat intervals.
+      At 1.0, 20% less than average CPU utilization will result in a 20%
+      decrease in heartbeat interval.
+    </description>
+    <name>
+      yarn.resourcemanager.nodemanagers.heartbeat-interval-speedup-factor
+    </name>
+    <value>1.0</value>
+  </property>
+
+  <property>
+    <description>If heart-beat interval scaling is enabled, this controls
+      the degree of adjustment when slowing down heartbeat intervals.
+      At 1.0, 20% greater than average CPU utilization will result in a 20%
+      increase in heartbeat interval.
+    </description>
+    <name>
+      yarn.resourcemanager.nodemanagers.heartbeat-interval-slowdown-factor
+    </name>
+    <value>1.0</value>
+  </property>
+
+  <property>
     <description>The minimum allowed version of a connecting nodemanager.  The valid values are
       NONE (no version checking), EqualToRM (the nodemanager's version is equal to
       or greater than the RM version), or a Version String.</description>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index 44ffd17..0312280 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -730,6 +730,14 @@ public class AdminService extends CompositeService implements
       // refresh dynamic resource in ResourceTrackerService
       this.rm.getRMContext().getResourceTrackerService().
           updateDynamicResourceConfiguration(newConf);
+
+      // Update our heartbeat configuration as well
+      Configuration ysconf =
+          getConfiguration(new Configuration(false),
+              YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
+      this.rm.getRMContext().getResourceTrackerService()
+        .updateHeartBeatConfiguration(ysconf);
+
       RMAuditLogger.logSuccess(user.getShortUserName(), operation,
               "AdminService");
       return response;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java
index 7724537..764d807 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 import org.apache.hadoop.metrics2.lib.MutableRate;
+import org.apache.hadoop.yarn.api.records.Resource;
 import com.google.common.annotations.VisibleForTesting;
 
 @InterfaceAudience.Private
@@ -53,6 +54,8 @@ public class ClusterMetrics {
   private MutableRate aMContainerAllocationDelay;
   @Metric("Memory Utilization") MutableGaugeLong utilizedMB;
   @Metric("Vcore Utilization") MutableGaugeLong utilizedVirtualCores;
+  @Metric("Memory Capability") MutableGaugeLong capabilityMB;
+  @Metric("Vcore Capability") MutableGaugeLong capabilityVirtualCores;
 
   private static final MetricsInfo RECORD_INFO = info("ClusterMetrics",
   "Metrics for the Yarn Cluster");
@@ -83,7 +86,7 @@ public class ClusterMetrics {
   }
 
   @VisibleForTesting
-  synchronized static void destroy() {
+  public synchronized static void destroy() {
     isInitialized.set(false);
     INSTANCE = null;
   }
@@ -195,6 +198,28 @@ public class ClusterMetrics {
     aMRegisterDelay.add(delay);
   }
 
+  public long getCapabilityMB() {
+    return capabilityMB.value();
+  }
+
+  public long getCapabilityVirtualCores() {
+    return capabilityVirtualCores.value();
+  }
+
+  public void incrCapability(Resource res) {
+    if (res != null) {
+      capabilityMB.incr(res.getMemorySize());
+      capabilityVirtualCores.incr(res.getVirtualCores());
+    }
+  }
+
+  public void decrCapability(Resource res) {
+    if (res != null) {
+      capabilityMB.decr(res.getMemorySize());
+      capabilityVirtualCores.decr(res.getVirtualCores());
+    }
+  }
+
   public void addAMContainerAllocationDelay(long delay) {
     aMContainerAllocationDelay.add(delay);
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 7d6feea..0200b06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -114,6 +113,13 @@ public class ResourceTrackerService extends AbstractService implements
   private final WriteLock writeLock;
 
   private long nextHeartBeatInterval;
+  private boolean heartBeatIntervalScalingEnable;
+  private long heartBeatIntervalMin;
+  private long heartBeatIntervalMax;
+  private float heartBeatIntervalSpeedupFactor;
+  private float heartBeatIntervalSlowdownFactor;
+
+
   private Server server;
   private InetSocketAddress resourceTrackerAddress;
   private String minimumNodeManagerVersion;
@@ -157,14 +163,6 @@ public class ResourceTrackerService extends AbstractService implements
         YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
 
     RackResolver.init(conf);
-    nextHeartBeatInterval =
-        conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
-            YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
-    if (nextHeartBeatInterval <= 0) {
-      throw new YarnRuntimeException("Invalid Configuration. "
-          + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS
-          + " should be larger than 0.");
-    }
 
     checkIpHostnameInRegistration = conf.getBoolean(
         YarnConfiguration.RM_NM_REGISTRATION_IP_HOSTNAME_CHECK_KEY,
@@ -188,7 +186,7 @@ public class ResourceTrackerService extends AbstractService implements
       isDelegatedCentralizedNodeLabelsConf =
           YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf);
     }
-
+    updateHeartBeatConfiguration(conf);
     loadDynamicResourceConfiguration(conf);
     decommissioningWatcher.init(conf);
     super.serviceInit(conf);
@@ -233,6 +231,84 @@ public class ResourceTrackerService extends AbstractService implements
     }
   }
 
+  /**
+   * Update HearBeatConfiguration with new configuration.
+   * @param conf Yarn Configuration
+   */
+  public void updateHeartBeatConfiguration(Configuration conf) {
+    this.writeLock.lock();
+    try {
+      nextHeartBeatInterval =
+          conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
+              YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
+      heartBeatIntervalScalingEnable =
+          conf.getBoolean(
+              YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE,
+              YarnConfiguration.
+                  DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE);
+      heartBeatIntervalMin =
+          conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MIN_MS,
+              YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MIN_MS);
+      heartBeatIntervalMax =
+          conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MAX_MS,
+              YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MAX_MS);
+      heartBeatIntervalSpeedupFactor =
+          conf.getFloat(
+              YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR,
+              YarnConfiguration.
+                  DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR);
+      heartBeatIntervalSlowdownFactor =
+          conf.getFloat(
+              YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR,
+              YarnConfiguration.
+                  DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR);
+
+      if (nextHeartBeatInterval <= 0) {
+        LOG.warn("HeartBeat interval: " + nextHeartBeatInterval
+            + " must be greater than 0, using default.");
+        nextHeartBeatInterval =
+            YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS;
+      }
+
+      if (heartBeatIntervalScalingEnable) {
+        if (heartBeatIntervalMin <= 0
+            || heartBeatIntervalMin > heartBeatIntervalMax
+            || nextHeartBeatInterval < heartBeatIntervalMin
+            || nextHeartBeatInterval > heartBeatIntervalMax) {
+          LOG.warn("Invalid NM Heartbeat Configuration. "
+              + "Required: 0 < minimum <= interval <= maximum. Got: 0 < "
+              + heartBeatIntervalMin + " <= "
+              + nextHeartBeatInterval + " <= "
+              + heartBeatIntervalMax
+              + " Setting min and max to configured interval.");
+          heartBeatIntervalMin = nextHeartBeatInterval;
+          heartBeatIntervalMax = nextHeartBeatInterval;
+        }
+        if (heartBeatIntervalSpeedupFactor < 0
+            || heartBeatIntervalSlowdownFactor < 0) {
+          LOG.warn(
+              "Heartbeat scaling factors must be >= 0 "
+                  + " SpeedupFactor:" + heartBeatIntervalSpeedupFactor
+                  + " SlowdownFactor:" + heartBeatIntervalSlowdownFactor
+                  + ". Using Defaults");
+          heartBeatIntervalSlowdownFactor =
+              YarnConfiguration.
+                  DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR;
+          heartBeatIntervalSpeedupFactor =
+              YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR;
+        }
+        LOG.info("Heartbeat Scaling Configuration: "
+            + " defaultInterval:" + nextHeartBeatInterval
+            + " minimumInterval:" + heartBeatIntervalMin
+            + " maximumInterval:" + heartBeatIntervalMax
+            + " speedupFactor:" + heartBeatIntervalSpeedupFactor
+            + " slowdownFactor:" + heartBeatIntervalSlowdownFactor);
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
   @Override
   protected void serviceStart() throws Exception {
     super.serviceStart();
@@ -629,10 +705,17 @@ public class ResourceTrackerService extends AbstractService implements
     }
 
     // Heartbeat response
+    long newInterval = nextHeartBeatInterval;
+    if (heartBeatIntervalScalingEnable) {
+      newInterval = rmNode.calculateHeartBeatInterval(
+          nextHeartBeatInterval, heartBeatIntervalMin,
+          heartBeatIntervalMax, heartBeatIntervalSpeedupFactor,
+          heartBeatIntervalSlowdownFactor);
+    }
     NodeHeartbeatResponse nodeHeartBeatResponse =
         YarnServerBuilderUtils.newNodeHeartbeatResponse(
             getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
-            NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
+            NodeAction.NORMAL, null, null, null, null, newInterval);
     rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);
 
     populateKeys(request, nodeHeartBeatResponse);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index d3b515e..e6205d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -212,4 +212,8 @@ public interface RMNode {
    * @return all node attributes as a Set.
    */
   Set<NodeAttribute> getAllNodeAttributes();
+
+  long calculateHeartBeatInterval(long defaultInterval,
+      long minInterval, long maxInterval, float speedupFactor,
+      float slowdownFactor);
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 48d92e2..48fad5d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -716,6 +716,48 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     }
   }
 
+  @Override
+  public long calculateHeartBeatInterval(long defaultInterval, long minInterval,
+      long maxInterval, float speedupFactor, float slowdownFactor) {
+
+    long newInterval = defaultInterval;
+
+    ClusterMetrics metrics = ClusterMetrics.getMetrics();
+    float clusterUtil = metrics.getUtilizedVirtualCores()
+        / Math.max(1.0f, metrics.getCapabilityVirtualCores());
+
+    if (this.nodeUtilization != null && this.getPhysicalResource() != null) {
+      // getCPU() returns utilization normalized to 1 cpu. getVirtualCores() on
+      // a physicalResource returns number of physical cores. So,
+      // nodeUtil will be CPU utilization of entire node.
+      float nodeUtil = this.nodeUtilization.getCPU()
+          / Math.max(1.0f, this.getPhysicalResource().getVirtualCores());
+
+      // sanitize
+      nodeUtil = Math.min(1.0f, Math.max(0.0f, nodeUtil));
+      clusterUtil = Math.min(1.0f, Math.max(0.0f, clusterUtil));
+
+      if (nodeUtil > clusterUtil) {
+        // Slow down - 20% more CPU utilization means slow down by 20% * factor
+        newInterval = (long) (defaultInterval
+            * (1.0f + (nodeUtil - clusterUtil) * slowdownFactor));
+      } else {
+        // Speed up - 20% less CPU utilization means speed up by 20% * factor
+        newInterval = (long) (defaultInterval
+            * (1.0f - (clusterUtil - nodeUtil) * speedupFactor));
+      }
+      newInterval =
+          Math.min(maxInterval, Math.max(minInterval, newInterval));
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting heartbeatinterval to: " + newInterval
+            + " node:" + this.nodeId + " nodeUtil: " + nodeUtil
+            + " clusterUtil: " + clusterUtil);
+      }
+    }
+    return newInterval;
+  }
+
   public void handle(RMNodeEvent event) {
     LOG.debug("Processing {} of type {}", event.getNodeId(), event.getType());
     writeLock.lock();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
index 50c45fc..973f11b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -106,6 +107,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
       // Update cluster capacity
       Resources.addTo(clusterCapacity, node.getTotalResource());
       staleClusterCapacity = Resources.clone(clusterCapacity);
+      ClusterMetrics.getMetrics().incrCapability(node.getTotalResource());
 
       // Update maximumAllocation
       updateMaxResources(node, true);
@@ -201,6 +203,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
       // Update cluster capacity
       Resources.subtractFrom(clusterCapacity, node.getTotalResource());
       staleClusterCapacity = Resources.clone(clusterCapacity);
+      ClusterMetrics.getMetrics().decrCapability(node.getTotalResource());
 
       // Update maximumAllocation
       updateMaxResources(node, false);
@@ -493,4 +496,4 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
     }
     return nodesPerPartition;
   }
-}
\ No newline at end of file
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 600edfc..f9cb99c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -335,6 +335,13 @@ public class MockNodes {
     public Resource getPhysicalResource() {
       return this.physicalResource;
     }
+
+    @Override
+    public long calculateHeartBeatInterval(
+        long defaultInterval, long minInterval, long maxInterval,
+        float speedupFactor, float slowdownFactor) {
+      return defaultInterval;
+    }
   };
 
   private static RMNode buildRMNode(int rack, final Resource perNode,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 931a2e7..b21bf39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -1167,4 +1168,112 @@ public class TestRMNodeTransitions {
     Assert.assertEquals(1, rmNode.getContainersToBeRemovedFromNM().size());
 
   }
+
+  private void calcIntervalTest(RMNodeImpl rmNode, ResourceUtilization nodeUtil,
+      long hbDefault, long hbMin, long hbMax, float speedup, float slowdown,
+                                float cpuUtil, long expectedHb) {
+    nodeUtil.setCPU(cpuUtil);
+    rmNode.setNodeUtilization(nodeUtil);
+    long hbInterval = rmNode.calculateHeartBeatInterval(hbDefault, hbMin, hbMax,
+        speedup, slowdown);
+    assertEquals("heartbeat interval incorrect", expectedHb, hbInterval);
+  }
+
+  @Test
+  public void testCalculateHeartBeatInterval() {
+    RMNodeImpl rmNode = getRunningNode();
+    Resource nodeCapability = rmNode.getTotalCapability();
+    ClusterMetrics metrics = ClusterMetrics.getMetrics();
+    // Set cluster capability to 10 * nodeCapability
+    int vcoreUnit = nodeCapability.getVirtualCores();
+    rmNode.setPhysicalResource(nodeCapability);
+    int clusterVcores = vcoreUnit * 10;
+    metrics.incrCapability(
+        Resource.newInstance(10 * nodeCapability.getMemorySize(),
+            clusterVcores));
+
+    long hbDefault = 2000;
+    long hbMin = 1500;
+    long hbMax = 2500;
+    float speedup = 1.0F;
+    float slowdown = 1.0F;
+    metrics.incrUtilizedVirtualCores(vcoreUnit * 5); // 50 % cluster util
+    ResourceUtilization nodeUtil = ResourceUtilization.newInstance(
+        1024, vcoreUnit, 0.0F * vcoreUnit); // 0% rmNode util
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.0F, hbMin); // 0%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.10F, hbMin); // 10%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.20F, hbMin); // 20%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.30F, 1600); // 30%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.40F, 1800); // 40%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.50F, hbDefault); // 50%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.60F, 2200); // 60%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.70F, 2400); // 70%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.80F, hbMax); // 80%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.90F, hbMax); // 90%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 1.0F, hbMax); // 100%
+
+    // Try with 50% speedup/slowdown factors
+    speedup = 0.5F;
+    slowdown = 0.5F;
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.0F, hbMin); // 0%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.10F, 1600); // 10%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.20F, 1700); // 20%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.30F, 1800); // 30%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.40F, 1900); // 40%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.50F, hbDefault); // 50%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.60F, 2100); // 60%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.70F, 2200); // 70%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.80F, 2300); // 80%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.90F, 2400); // 90%
+
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 1.0F, hbMax); // 100%
+
+    // With Physical Resource null, it should always return default
+    rmNode.setPhysicalResource(null);
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 0.1F, hbDefault); // 10%
+    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
+        speedup, slowdown, vcoreUnit * 1.0F, hbDefault); // 100%
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
index c1703bc..14eca5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java
@@ -24,11 +24,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -40,12 +42,19 @@ import static org.junit.Assert.assertEquals;
  */
 public class TestClusterNodeTracker {
   private ClusterNodeTracker<FSSchedulerNode> nodeTracker;
+  private ClusterMetrics metrics;
 
   @Before
   public void setup() {
+    metrics = ClusterMetrics.getMetrics();
     nodeTracker = new ClusterNodeTracker<>();
   }
 
+  @After
+  public void teardown() {
+    ClusterMetrics.destroy();
+  }
+
   private void addEight4x4Nodes() {
     MockNodes.resetHostIds();
     List<RMNode> rmNodes =
@@ -66,6 +75,15 @@ public class TestClusterNodeTracker {
   }
 
   @Test
+  public void testIncrCapability() {
+    addEight4x4Nodes();
+    assertEquals("Cluster Capability Memory incorrect",
+        metrics.getCapabilityMB(), (4096 * 8));
+    assertEquals("Cluster Capability Vcores incorrect",
+        metrics.getCapabilityVirtualCores(), 4 * 8);
+  }
+
+  @Test
   public void testGetNodesForResourceName() throws Exception {
     addEight4x4Nodes();
     assertEquals("Incorrect number of nodes matching ANY",
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManager.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManager.md
index d8c368c..596a47e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManager.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/NodeManager.md
@@ -219,3 +219,21 @@ The following parameters can be used to configure the container log dir sizes.
 | `yarn.nodemanager.container-log-monitor.interval-ms` | Positive integer | How often to check the usage of a container's log directories in milliseconds. Default is 60000 ms. |
 | `yarn.nodemanager.container-log-monitor.dir-size-limit-bytes` | Long | The disk space limit, in bytes, for a single container log directory. Default is 1000000000. |
 | `yarn.nodemanager.container-log-monitor.total-size-limit-bytes` | Long | The disk space limit, in bytes, for all of a container's logs. The default is 10000000000. |
+
+Scale Heart-beat Interval Based on CPU Utilization
+-------------------------------------------------
+
+This allows a cluster admin to configure a cluster to allow the heart-beat between the Resource Manager and each NodeManager to be scaled based on the CPU utilization of the node compared to the overall CPU utilization of the cluster. 
+
+### Configuration
+
+The following parameters can be used to configure the heart-beat interval and whether and how it scales.
+
+| Configuration Name | Allowed Values | Description |
+|:---- |:---- |:---- |
+| `yarn.resourcemanager.nodemanagers.heartbeat-interval-ms` | Long | Specifies the default heart-beat interval in milliseconds for every NodeManager in the cluster. Default is 1000 ms. |
+| `yarn.resourcemanager.nodemanagers.heartbeat-interval-scaling-enable` | true, false | Enables heart-beat interval scaling.  If true, The NodeManager heart-beat interval will scale based on the difference between the CPU utilization on the node and the cluster-wide average CPU utilization. Default is false. |
+| `yarn.resourcemanager.nodemanagers.heartbeat-interval-min-ms` | Positive Long | If heart-beat interval scaling is enabled, this is the minimum heart-beat interval in milliseconds. Default is 1000 ms. |
+| `yarn.resourcemanager.nodemanagers.heartbeat-interval-max-ms` | Positive Long | If heart-beat interval scaling is enabled, this is the maximum heart-beat interval in milliseconds. Default is 1000 ms. |
+| `yarn.resourcemanager.nodemanagers.heartbeat-interval-speedup-factor` | Positive Float | If heart-beat interval scaling is enabled, this controls the degree of adjustment when speeding up heartbeat intervals. At 1.0, 20% less than the average cluster-wide CPU utilization will result in a 20% decrease in the heartbeat interval. Default is 1.0. |
+| `yarn.resourcemanager.nodemanagers.heartbeat-interval-slowdown-factor` | Positive Float | If heart-beat interval scaling is enabled, this controls the degree of adjustment when slowing down heartbeat intervals. At 1.0, 20% greater than the average cluster-wide CPU utilization will result in a 20% increase in the heartbeat interval. Default is 1.0. |


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org