You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/01/30 19:29:07 UTC

[28/37] hadoop git commit: YARN-7790. Improve Capacity Scheduler Async Scheduling to better handle node failures. Contributed by Wangda Tan.

YARN-7790. Improve Capacity Scheduler Async Scheduling to better handle node failures. Contributed by Wangda Tan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e9c72d04
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e9c72d04
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e9c72d04

Branch: refs/heads/HDFS-7240
Commit: e9c72d04beddfe0252d2e81123a9fe66bdf04078
Parents: 3400d0c
Author: Sunil G <su...@apache.org>
Authored: Mon Jan 29 20:43:08 2018 +0530
Committer: Sunil G <su...@apache.org>
Committed: Mon Jan 29 20:44:38 2018 +0530

----------------------------------------------------------------------
 .../scheduler/AbstractYarnScheduler.java        |  51 +++---
 .../scheduler/SchedulerNode.java                |  16 ++
 .../scheduler/capacity/CapacityScheduler.java   |  49 +++++-
 .../TestRMHAForAsyncScheduler.java              |  38 ++++-
 .../TestCapacitySchedulerAsyncScheduling.java   | 159 ++++++++++++++++++-
 5 files changed, 276 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c72d04/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index c94c379..4b76327 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -980,11 +980,11 @@ public abstract class AbstractYarnScheduler
   /**
    * Get lists of new containers from NodeManager and process them.
    * @param nm The RMNode corresponding to the NodeManager
+   * @param schedulerNode schedulerNode
    * @return list of completed containers
    */
-  protected List<ContainerStatus> updateNewContainerInfo(RMNode nm) {
-    SchedulerNode node = getNode(nm.getNodeID());
-
+  private List<ContainerStatus> updateNewContainerInfo(RMNode nm,
+      SchedulerNode schedulerNode) {
     List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
     List<ContainerStatus> newlyLaunchedContainers =
         new ArrayList<>();
@@ -999,14 +999,15 @@ public abstract class AbstractYarnScheduler
 
     // Processing the newly launched containers
     for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
-      containerLaunchedOnNode(launchedContainer.getContainerId(), node);
+      containerLaunchedOnNode(launchedContainer.getContainerId(),
+          schedulerNode);
     }
 
     // Processing the newly increased containers
     List<Container> newlyIncreasedContainers =
         nm.pullNewlyIncreasedContainers();
     for (Container container : newlyIncreasedContainers) {
-      containerIncreasedOnNode(container.getId(), node, container);
+      containerIncreasedOnNode(container.getId(), schedulerNode, container);
     }
 
     return completedContainers;
@@ -1017,12 +1018,12 @@ public abstract class AbstractYarnScheduler
    * @param completedContainers Extracted list of completed containers
    * @param releasedResources Reference resource object for completed containers
    * @param nodeId NodeId corresponding to the NodeManager
+   * @param schedulerNode schedulerNode
    * @return The total number of released containers
    */
-  protected int updateCompletedContainers(List<ContainerStatus>
-      completedContainers, Resource releasedResources, NodeId nodeId) {
+  private int updateCompletedContainers(List<ContainerStatus> completedContainers,
+      Resource releasedResources, NodeId nodeId, SchedulerNode schedulerNode) {
     int releasedContainers = 0;
-    SchedulerNode node = getNode(nodeId);
     List<ContainerId> untrackedContainerIdList = new ArrayList<ContainerId>();
     for (ContainerStatus completedContainer : completedContainers) {
       ContainerId containerId = completedContainer.getContainerId();
@@ -1030,8 +1031,8 @@ public abstract class AbstractYarnScheduler
       RMContainer container = getRMContainer(containerId);
       completedContainer(container,
           completedContainer, RMContainerEventType.FINISHED);
-      if (node != null) {
-        node.releaseContainer(containerId, true);
+      if (schedulerNode != null) {
+        schedulerNode.releaseContainer(containerId, true);
       }
 
       if (container != null) {
@@ -1076,14 +1077,14 @@ public abstract class AbstractYarnScheduler
   /**
    * Update container and utilization information on the NodeManager.
    * @param nm The NodeManager to update
+   * @param schedulerNode schedulerNode
    */
-  protected void updateNodeResourceUtilization(RMNode nm) {
-    SchedulerNode node = getNode(nm.getNodeID());
+  protected void updateNodeResourceUtilization(RMNode nm,
+      SchedulerNode schedulerNode) {
     // Updating node resource utilization
-    node.setAggregatedContainersUtilization(
+    schedulerNode.setAggregatedContainersUtilization(
         nm.getAggregatedContainersUtilization());
-    node.setNodeUtilization(nm.getNodeUtilization());
-
+    schedulerNode.setNodeUtilization(nm.getNodeUtilization());
   }
 
   /**
@@ -1097,12 +1098,17 @@ public abstract class AbstractYarnScheduler
     }
 
     // Process new container information
-    List<ContainerStatus> completedContainers = updateNewContainerInfo(nm);
+    SchedulerNode schedulerNode = getNode(nm.getNodeID());
+    List<ContainerStatus> completedContainers = updateNewContainerInfo(nm,
+        schedulerNode);
+
+    // Notify Scheduler Node updated.
+    schedulerNode.notifyNodeUpdate();
 
     // Process completed containers
     Resource releasedResources = Resource.newInstance(0, 0);
     int releasedContainers = updateCompletedContainers(completedContainers,
-        releasedResources, nm.getNodeID());
+        releasedResources, nm.getNodeID(), schedulerNode);
 
     // If the node is decommissioning, send an update to have the total
     // resource equal to the used resource, so no available resource to
@@ -1115,18 +1121,17 @@ public abstract class AbstractYarnScheduler
           .getEventHandler()
           .handle(
               new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
-                  .newInstance(getSchedulerNode(nm.getNodeID())
-                      .getAllocatedResource(), 0)));
+                  .newInstance(schedulerNode.getAllocatedResource(), 0)));
     }
 
     updateSchedulerHealthInformation(releasedResources, releasedContainers);
-    updateNodeResourceUtilization(nm);
+    updateNodeResourceUtilization(nm, schedulerNode);
 
     // Now node data structures are up-to-date and ready for scheduling.
     if(LOG.isDebugEnabled()) {
-      SchedulerNode node = getNode(nm.getNodeID());
-      LOG.debug("Node being looked for scheduling " + nm +
-          " availableResource: " + node.getUnallocatedResource());
+      LOG.debug(
+          "Node being looked for scheduling " + nm + " availableResource: "
+              + schedulerNode.getUnallocatedResource());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c72d04/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 05dbf1e..89f748d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
@@ -76,6 +77,9 @@ public abstract class SchedulerNode {
 
   private volatile Set<String> labels = null;
 
+  // Last updated time
+  private volatile long lastHeartbeatMonotonicTime;
+
   public SchedulerNode(RMNode node, boolean usePortForNodeName,
       Set<String> labels) {
     this.rmNode = node;
@@ -87,6 +91,7 @@ public abstract class SchedulerNode {
       nodeName = rmNode.getHostName();
     }
     this.labels = ImmutableSet.copyOf(labels);
+    this.lastHeartbeatMonotonicTime = Time.monotonicNow();
   }
 
   public SchedulerNode(RMNode node, boolean usePortForNodeName) {
@@ -453,6 +458,17 @@ public abstract class SchedulerNode {
     return this.nodeUtilization;
   }
 
+  public long getLastHeartbeatMonotonicTime() {
+    return lastHeartbeatMonotonicTime;
+  }
+
+  /**
+   * This will be called for each node heartbeat.
+   */
+  public void notifyNodeUpdate() {
+    this.lastHeartbeatMonotonicTime = Time.monotonicNow();
+  }
+
 
   private static class ContainerInfo {
     private final RMContainer container;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c72d04/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 99f4456..03ca507 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -142,7 +142,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleC
 import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.Lock;
-import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@@ -181,8 +180,6 @@ public class CapacityScheduler extends
 
   private CSConfigurationProvider csConfProvider;
 
-  protected Clock monotonicClock;
-
   @Override
   public void setConf(Configuration conf) {
       yarnConf = conf;
@@ -243,6 +240,8 @@ public class CapacityScheduler extends
   private RMNodeLabelsManager labelManager;
   private AppPriorityACLsManager appPriorityACLManager;
 
+  private static boolean printedVerboseLoggingForAsyncScheduling = false;
+
   /**
    * EXPERT
    */
@@ -471,6 +470,22 @@ public class CapacityScheduler extends
 
   private final static Random random = new Random(System.currentTimeMillis());
 
+  private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node,
+      CapacityScheduler cs, boolean printVerboseLog) {
+    // Skip node which missed 2 heartbeats since the node might be dead and
+    // we should not continue allocate containers on that.
+    long timeElapsedFromLastHeartbeat =
+        Time.monotonicNow() - node.getLastHeartbeatMonotonicTime();
+    if (timeElapsedFromLastHeartbeat > cs.nmHeartbeatInterval * 2) {
+      if (printVerboseLog && LOG.isDebugEnabled()) {
+        LOG.debug("Skip scheduling on node because it haven't heartbeated for "
+            + timeElapsedFromLastHeartbeat / 1000.0f + " secs");
+      }
+      return true;
+    }
+    return false;
+  }
+
   /**
    * Schedule on all nodes by starting at a random point.
    * @param cs
@@ -481,16 +496,42 @@ public class CapacityScheduler extends
     Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes();
     int start = random.nextInt(nodes.size());
 
+    // To avoid too verbose DEBUG logging, only print debug log once for
+    // every 10 secs.
+    boolean printSkipedNodeLogging = false;
+    if (Time.monotonicNow() / 1000 % 10 == 0) {
+      printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling);
+    } else {
+      printedVerboseLoggingForAsyncScheduling = false;
+    }
+
+    // Allocate containers of node [start, end)
     for (FiCaSchedulerNode node : nodes) {
       if (current++ >= start) {
+        if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
+          continue;
+        }
         cs.allocateContainersToNode(node.getNodeID(), false);
       }
     }
-    // Now, just get everyone to be safe
+
+    current = 0;
+
+    // Allocate containers of node [0, start)
     for (FiCaSchedulerNode node : nodes) {
+      if (current++ > start) {
+        break;
+      }
+      if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
+        continue;
+      }
       cs.allocateContainersToNode(node.getNodeID(), false);
     }
 
+    if (printSkipedNodeLogging) {
+      printedVerboseLoggingForAsyncScheduling = true;
+    }
+
     Thread.sleep(cs.getAsyncScheduleInterval());
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c72d04/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java
index 46d5cda..36f1762 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java
@@ -28,13 +28,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAsyncScheduling;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.List;
+
 public class TestRMHAForAsyncScheduler extends RMHATestBase {
+  private TestCapacitySchedulerAsyncScheduling.NMHeartbeatThread
+      nmHeartbeatThread = null;
 
   @Before
   @Override
@@ -57,26 +63,49 @@ public class TestRMHAForAsyncScheduler extends RMHATestBase {
         CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
   }
 
+  private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) {
+    if (nmHeartbeatThread != null) {
+      nmHeartbeatThread.setShouldStop();
+      nmHeartbeatThread = null;
+    }
+    nmHeartbeatThread =
+        new TestCapacitySchedulerAsyncScheduling.NMHeartbeatThread(mockNMs,
+            interval);
+    nmHeartbeatThread.start();
+  }
+
+  private void pauseNMHeartbeat() {
+    if (nmHeartbeatThread != null) {
+      nmHeartbeatThread.setShouldStop();
+      nmHeartbeatThread = null;
+    }
+  }
+
   @Test(timeout = 60000)
   public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception {
     // start two RMs, and transit rm1 to active, rm2 to standby
     startRMs();
     // register NM
-    rm1.registerNode("h1:1234", 8192, 8);
+    MockNM nm = rm1.registerNode("192.1.1.1:1234", 8192, 8);
     // submit app1 and check
     RMApp app1 = submitAppAndCheckLaunched(rm1);
+    keepNMHeartbeat(Arrays.asList(nm), 1000);
 
     // failover RM1 to RM2
     explicitFailover();
     checkAsyncSchedulerThreads(Thread.currentThread());
+    pauseNMHeartbeat();
 
     // register NM, kill app1
-    rm2.registerNode("h1:1234", 8192, 8);
+    nm = rm2.registerNode("192.1.1.1:1234", 8192, 8);
+    keepNMHeartbeat(Arrays.asList(nm), 1000);
+
     rm2.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(),
         RMAppAttemptState.LAUNCHED);
     rm2.killApp(app1.getApplicationId());
     // submit app3 and check
     RMApp app2 = submitAppAndCheckLaunched(rm2);
+    pauseNMHeartbeat();
 
     // failover RM2 to RM1
     HAServiceProtocol.StateChangeRequestInfo requestInfo =
@@ -92,12 +121,15 @@ public class TestRMHAForAsyncScheduler extends RMHATestBase {
     checkAsyncSchedulerThreads(Thread.currentThread());
 
     // register NM, kill app2
-    rm1.registerNode("h1:1234", 8192, 8);
+    nm = rm1.registerNode("192.1.1.1:1234", 8192, 8);
+    keepNMHeartbeat(Arrays.asList(nm), 1000);
+
     rm1.waitForState(app2.getCurrentAppAttempt().getAppAttemptId(),
         RMAppAttemptState.LAUNCHED);
     rm1.killApp(app2.getApplicationId());
     // submit app3 and check
     submitAppAndCheckLaunched(rm1);
+    pauseNMHeartbeat();
 
     rm1.stop();
     rm2.stop();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c72d04/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index 77596e2..548b909 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
@@ -72,6 +73,8 @@ public class TestCapacitySchedulerAsyncScheduling {
 
   RMNodeLabelsManager mgr;
 
+  private NMHeartbeatThread nmHeartbeatThread = null;
+
   @Before
   public void setUp() throws Exception {
     conf = new YarnConfiguration();
@@ -122,9 +125,11 @@ public class TestCapacitySchedulerAsyncScheduling {
     List<MockNM> nms = new ArrayList<>();
     // Add 10 nodes to the cluster, in the cluster we have 200 GB resource
     for (int i = 0; i < 10; i++) {
-      nms.add(rm.registerNode("h-" + i + ":1234", 20 * GB));
+      nms.add(rm.registerNode("127.0.0." + i + ":1234", 20 * GB));
     }
 
+    keepNMHeartbeat(nms, 1000);
+
     List<MockAM> ams = new ArrayList<>();
     // Add 3 applications to the cluster, one app in one queue
     // the i-th app ask (20 * i) containers. So in total we will have
@@ -185,8 +190,8 @@ public class TestCapacitySchedulerAsyncScheduling {
     // init RM & NMs & Nodes
     final MockRM rm = new MockRM(disableAsyncConf);
     rm.start();
-    final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
-    final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB);
+    final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 9 * GB);
+    final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 9 * GB);
     List<MockNM> nmLst = new ArrayList<>();
     nmLst.add(nm1);
     nmLst.add(nm2);
@@ -277,8 +282,8 @@ public class TestCapacitySchedulerAsyncScheduling {
     // init RM & NMs & Nodes
     final MockRM rm = new MockRM(disableAsyncConf);
     rm.start();
-    final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
-    final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB);
+    final MockNM nm1 = rm.registerNode("127.0.0.1:1234", 9 * GB);
+    final MockNM nm2 = rm.registerNode("127.0.0.2:2234", 9 * GB);
 
     // init scheduler nodes
     int waitTime = 1000;
@@ -416,8 +421,8 @@ public class TestCapacitySchedulerAsyncScheduling {
     // init RM & NMs & Nodes
     final MockRM rm = new MockRM(disableAsyncConf);
     rm.start();
-    final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
-    final MockNM nm2 = rm.registerNode("h2:1234", 9 * GB);
+    final MockNM nm1 = rm.registerNode("127.0.0.1:1234", 9 * GB);
+    final MockNM nm2 = rm.registerNode("127.0.0.2:1234", 9 * GB);
     List<MockNM> nmLst = new ArrayList<>();
     nmLst.add(nm1);
     nmLst.add(nm2);
@@ -476,6 +481,146 @@ public class TestCapacitySchedulerAsyncScheduling {
     rm.stop();
   }
 
+  /**
+   * Make sure scheduler skips NMs which haven't heartbeat for a while.
+   * @throws Exception
+   */
+  @Test
+  public void testAsyncSchedulerSkipNoHeartbeatNMs() throws Exception {
+    int heartbeatInterval = 100;
+    conf.setInt(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
+        1);
+    conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+        + ".scheduling-interval-ms", 100);
+    // Heartbeat interval is 100 ms.
+    conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, heartbeatInterval);
+
+    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+
+    // inject node label manager
+    MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+
+    List<MockNM> nms = new ArrayList<>();
+    // Add 10 nodes to the cluster, in the cluster we have 200 GB resource
+    for (int i = 0; i < 10; i++) {
+      nms.add(rm.registerNode("127.0.0." + i + ":1234", 20 * GB));
+    }
+
+    List<MockAM> ams = new ArrayList<>();
+
+    keepNMHeartbeat(nms, heartbeatInterval);
+
+    for (int i = 0; i < 3; i++) {
+      RMApp rmApp = rm.submitApp(1024, "app", "user", null, false,
+          Character.toString((char) (i % 34 + 97)), 1, null, null, false);
+      MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm);
+      am.registerAppAttempt();
+      ams.add(am);
+    }
+
+    pauseNMHeartbeat();
+
+    Thread.sleep(heartbeatInterval * 3);
+
+    // Applications request containers.
+    for (int i = 0; i < 3; i++) {
+      ams.get(i).allocate("*", 1024, 20 * (i + 1), new ArrayList<>());
+    }
+
+    for (int i = 0; i < 5; i++) {
+      // Do heartbeat for NM 0-4
+      nms.get(i).nodeHeartbeat(true);
+    }
+
+    // Wait for 2000 ms.
+    Thread.sleep(2000);
+
+    // Make sure that NM5-9 don't have non-AM containers.
+    for (int i = 0; i < 9; i++) {
+      if (i < 5) {
+        Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) > 0);
+      } else {
+        Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) == 0);
+      }
+    }
+
+    rm.close();
+  }
+
+  public static class NMHeartbeatThread extends Thread {
+    private List<MockNM> mockNMS;
+    private int interval;
+    private volatile boolean shouldStop = false;
+
+    public NMHeartbeatThread(List<MockNM> mockNMs, int interval) {
+      this.mockNMS = mockNMs;
+      this.interval = interval;
+    }
+
+    public void run() {
+      while (true) {
+        if (shouldStop) {
+          break;
+        }
+        for (MockNM nm : mockNMS) {
+          try {
+            nm.nodeHeartbeat(true);
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        }
+        try {
+          Thread.sleep(interval);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+
+    public void setShouldStop() {
+      shouldStop = true;
+    }
+  }
+
+  private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) {
+    if (nmHeartbeatThread != null) {
+      nmHeartbeatThread.setShouldStop();
+      nmHeartbeatThread = null;
+    }
+    nmHeartbeatThread = new NMHeartbeatThread(mockNMs, interval);
+    nmHeartbeatThread.start();
+  }
+
+  private void pauseNMHeartbeat() {
+    if (nmHeartbeatThread != null) {
+      nmHeartbeatThread.setShouldStop();
+      nmHeartbeatThread = null;
+    }
+  }
+
+  private int checkNumNonAMContainersOnNode(CapacityScheduler cs, MockNM nm) {
+    SchedulerNode node = cs.getNode(nm.getNodeId());
+    int nonAMContainer = 0;
+    for (RMContainer c : node.getCopiedListOfRunningContainers()) {
+      if (!c.isAMContainer()) {
+         nonAMContainer++;
+      }
+    }
+    return nonAMContainer;
+  }
+
   private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
       int nContainer, Resource resource, int priority, int startContainerId)
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org