You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bi...@apache.org on 2021/02/03 04:04:10 UTC

[hadoop] branch trunk updated: YARN-10352 Skip schedule on not heartbeated nodes in Multi Node Placement. Contributed by Prabhu Joseph and Qi Zhu

This is an automated email from the ASF dual-hosted git repository.

bibinchundatt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6fc26ad  YARN-10352 Skip schedule on not heartbeated nodes in Multi Node Placement. Contributed by Prabhu Joseph and Qi Zhu
6fc26ad is described below

commit 6fc26ad5392a2a61ace60b88ed931fed3859365d
Author: bibinchundatt <bi...@apache.org>
AuthorDate: Wed Feb 3 08:50:45 2021 +0530

    YARN-10352 Skip schedule on not heartbeated nodes in Multi Node Placement. Contributed by Prabhu Joseph and Qi Zhu
---
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |  22 +++++
 .../src/main/resources/yarn-default.xml            |   7 ++
 .../scheduler/AbstractYarnScheduler.java           |   6 ++
 .../resourcemanager/scheduler/SchedulerUtils.java  |   8 ++
 .../scheduler/capacity/CapacityScheduler.java      |  98 +++++++++++++------
 .../scheduler/placement/MultiNodeSorter.java       |   2 +-
 .../placement/MultiNodeSortingManager.java         |  43 +++++++-
 .../TestCapacitySchedulerAsyncScheduling.java      | 108 ++++++++++++++++++++-
 .../capacity/TestCapacitySchedulerMultiNodes.java  |  38 ++++++++
 ...tCapacitySchedulerMultiNodesWithPreemption.java |   1 +
 ...esSchedulerActivitiesWithMultiNodesEnabled.java |   1 +
 11 files changed, 298 insertions(+), 36 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 5fa3ea9..d56fc64 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -712,6 +712,14 @@ public class YarnConfiguration extends Configuration {
   public static final float
       DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR = 1.0f;
 
+  /**
+   * Number of consecutive missed heartbeats after which node will be
+   * skipped from scheduling.
+   */
+  public static final String SCHEDULER_SKIP_NODE_MULTIPLIER =
+      YARN_PREFIX + "scheduler.skip.node.multiplier";
+  public static final int DEFAULT_SCHEDULER_SKIP_NODE_MULTIPLIER = 2;
+
   /** Number of worker threads that write the history data. */
   public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE =
       RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size";
@@ -4751,6 +4759,20 @@ public class YarnConfiguration extends Configuration {
         DEFAULT_NM_NUMA_AWARENESS_ENABLED);
   }
 
+  /**
+   * Returns Timeout to skip node from scheduling if not heartbeated.
+   * @param conf the configuration
+   * @return timeout in milliseconds.
+   */
+  public static long getSkipNodeInterval(Configuration conf) {
+    long heartbeatIntvl = conf.getLong(
+         YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
+         YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
+    int multiplier = conf.getInt(SCHEDULER_SKIP_NODE_MULTIPLIER,
+        DEFAULT_SCHEDULER_SKIP_NODE_MULTIPLIER);
+    return multiplier * heartbeatIntvl;
+  }
+
   /* For debugging. mp configurations to system output as XML format. */
   public static void main(String[] args) throws Exception {
     new YarnConfiguration(new Configuration()).writeXml(System.out);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 4349d56..23eba6e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -911,6 +911,13 @@
   </property>
 
   <property>
+    <description>The Number of consecutive missed heartbeats after which node will be
+    skipped from scheduling</description>
+    <name>yarn.scheduler.skip.node.multiplier</name>
+    <value>2</value>
+  </property>
+
+  <property>
     <description>The minimum allowed version of a connecting nodemanager.  The valid values are
       NONE (no version checking), EqualToRM (the nodemanager's version is equal to
       or greater than the RM version), or a Version String.</description>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 4144236..f95b30b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -159,6 +159,7 @@ public abstract class AbstractYarnScheduler
   protected ConcurrentMap<ApplicationId, SchedulerApplication<T>> applications;
   protected int nmExpireInterval;
   protected long nmHeartbeatInterval;
+  private long skipNodeInterval;
 
   private final static List<Container> EMPTY_CONTAINER_LIST =
       new ArrayList<Container>();
@@ -210,6 +211,7 @@ public abstract class AbstractYarnScheduler
     nmHeartbeatInterval =
         conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
             YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
+    skipNodeInterval = YarnConfiguration.getSkipNodeInterval(conf);
     long configuredMaximumAllocationWaitTime =
         conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
           YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
@@ -368,6 +370,10 @@ public abstract class AbstractYarnScheduler
     return lastNodeUpdateTime;
   }
 
+  public long getSkipNodeInterval(){
+    return skipNodeInterval;
+  }
+
   protected void containerLaunchedOnNode(
       ContainerId containerId, SchedulerNode node) {
     readLock.lock();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index 58e2597..abb274e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTest
 import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -602,4 +603,11 @@ public class SchedulerUtils {
     node.allocateContainer(rmContainer);
     return rmContainer;
   }
+
+  public static boolean isNodeHeartbeated(SchedulerNode node,
+      long skipNodeInterval) {
+    long timeElapsedFromLastHeartbeat =
+        Time.monotonicNow() - node.getLastHeartbeatMonotonicTime();
+    return timeElapsedFromLastHeartbeat <= skipNodeInterval;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 1bb74a0..158c9cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ResourceSizing;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -232,7 +233,7 @@ public class CapacityScheduler extends
 
   private CapacitySchedulerAutoQueueHandler autoQueueHandler;
 
-  private static boolean printedVerboseLoggingForAsyncScheduling = false;
+  private boolean printedVerboseLoggingForAsyncScheduling;
 
   /**
    * EXPERT
@@ -518,22 +519,47 @@ public class CapacityScheduler extends
 
   private final static Random random = new Random(System.currentTimeMillis());
 
-  private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node,
+  @VisibleForTesting
+  public static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node,
       CapacityScheduler cs, boolean printVerboseLog) {
-    // Skip node which missed 2 heartbeats since the node might be dead and
-    // we should not continue allocate containers on that.
-    long timeElapsedFromLastHeartbeat =
-        Time.monotonicNow() - node.getLastHeartbeatMonotonicTime();
-    if (timeElapsedFromLastHeartbeat > cs.nmHeartbeatInterval * 2) {
+    // Skip node which missed YarnConfiguration.SCHEDULER_SKIP_NODE_MULTIPLIER
+    // heartbeats since the node might be dead and we should not continue
+    // allocate containers on that.
+    if (!SchedulerUtils.isNodeHeartbeated(node, cs.getSkipNodeInterval())) {
       if (printVerboseLog && LOG.isDebugEnabled()) {
-        LOG.debug("Skip scheduling on node because it haven't heartbeated for "
+        long timeElapsedFromLastHeartbeat =
+            Time.monotonicNow() - node.getLastHeartbeatMonotonicTime();
+        LOG.debug("Skip scheduling on node " + node.getNodeID()
+            + " because it haven't heartbeated for "
             + timeElapsedFromLastHeartbeat / 1000.0f + " secs");
       }
       return true;
     }
+
+    if (node.getRMNode().getState() != NodeState.RUNNING) {
+      if (printVerboseLog && LOG.isDebugEnabled()) {
+        LOG.debug("Skip scheduling on node because it is in " +
+            node.getRMNode().getState() + " state");
+      }
+      return true;
+    }
     return false;
   }
 
+  private static boolean isPrintSkippedNodeLogging(CapacityScheduler cs) {
+    // To avoid too verbose DEBUG logging, only print debug log once for
+    // every 10 secs.
+    boolean printSkipedNodeLogging = false;
+    if (LOG.isDebugEnabled()) {
+      if (Time.monotonicNow() / 1000 % 10 == 0) {
+        printSkipedNodeLogging = (!cs.printedVerboseLoggingForAsyncScheduling);
+      } else {
+        cs.printedVerboseLoggingForAsyncScheduling = false;
+      }
+    }
+    return printSkipedNodeLogging;
+  }
+
   /**
    * Schedule on all nodes by starting at a random point.
    * Schedule on all partitions by starting at a random partition
@@ -555,19 +581,12 @@ public class CapacityScheduler extends
     if (!cs.multiNodePlacementEnabled) {
       int start = random.nextInt(nodeSize);
 
-      // To avoid too verbose DEBUG logging, only print debug log once for
-      // every 10 secs.
-      boolean printSkipedNodeLogging = false;
-      if (Time.monotonicNow() / 1000 % 10 == 0) {
-        printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling);
-      } else {
-        printedVerboseLoggingForAsyncScheduling = false;
-      }
+      boolean printSkippedNodeLogging = isPrintSkippedNodeLogging(cs);
 
       // Allocate containers of node [start, end)
       for (FiCaSchedulerNode node : nodes) {
         if (current++ >= start) {
-          if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
+          if (shouldSkipNodeSchedule(node, cs, printSkippedNodeLogging)) {
             continue;
           }
           cs.allocateContainersToNode(node.getNodeID(), false);
@@ -581,14 +600,14 @@ public class CapacityScheduler extends
         if (current++ > start) {
           break;
         }
-        if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
+        if (shouldSkipNodeSchedule(node, cs, printSkippedNodeLogging)) {
           continue;
         }
         cs.allocateContainersToNode(node.getNodeID(), false);
       }
 
-      if (printSkipedNodeLogging) {
-        printedVerboseLoggingForAsyncScheduling = true;
+      if (printSkippedNodeLogging) {
+        cs.printedVerboseLoggingForAsyncScheduling = true;
       }
     } else {
       // Get all partitions
@@ -1541,20 +1560,37 @@ public class CapacityScheduler extends
             || assignedContainers < maxAssignPerHeartbeat);
   }
 
-  private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet(
-          String partition) {
-    CandidateNodeSet<FiCaSchedulerNode> candidates = null;
+  private Map<NodeId, FiCaSchedulerNode> getNodesHeartbeated(String partition) {
     Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>();
+    boolean printSkippedNodeLogging = isPrintSkippedNodeLogging(this);
     List<FiCaSchedulerNode> nodes = nodeTracker
-            .getNodesPerPartition(partition);
+        .getNodesPerPartition(partition);
+
     if (nodes != null && !nodes.isEmpty()) {
       //Filter for node heartbeat too long
       nodes.stream()
-              .filter(node -> !shouldSkipNodeSchedule(node, this, true))
-              .forEach(n -> nodesByPartition.put(n.getNodeID(), n));
+          .filter(node ->
+              !shouldSkipNodeSchedule(node, this, printSkippedNodeLogging))
+          .forEach(n -> nodesByPartition.put(n.getNodeID(), n));
+    }
+
+    if (printSkippedNodeLogging) {
+      printedVerboseLoggingForAsyncScheduling = true;
+    }
+    return nodesByPartition;
+  }
+
+  private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet(
+          String partition) {
+    CandidateNodeSet<FiCaSchedulerNode> candidates = null;
+    Map<NodeId, FiCaSchedulerNode> nodesByPartition
+        = getNodesHeartbeated(partition);
+
+    if (!nodesByPartition.isEmpty()) {
       candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
-              nodesByPartition, partition);
+          nodesByPartition, partition);
     }
+
     return candidates;
   }
 
@@ -1563,11 +1599,9 @@ public class CapacityScheduler extends
     CandidateNodeSet<FiCaSchedulerNode> candidates = null;
     candidates = new SimpleCandidateNodeSet<>(node);
     if (multiNodePlacementEnabled) {
-      Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>();
-      List<FiCaSchedulerNode> nodes = nodeTracker
-              .getNodesPerPartition(node.getPartition());
-      if (nodes != null && !nodes.isEmpty()) {
-        nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
+      Map<NodeId, FiCaSchedulerNode> nodesByPartition =
+          getNodesHeartbeated(node.getPartition());
+      if (!nodesByPartition.isEmpty()) {
         candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
                 nodesByPartition, node.getPartition());
       }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java
index f9fcdfd..f77a55d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java
@@ -135,7 +135,7 @@ public class MultiNodeSorter<N extends SchedulerNode> extends AbstractService {
       Map<NodeId, SchedulerNode> nodesByPartition = new HashMap<>();
       List<SchedulerNode> nodes = ((AbstractYarnScheduler) rmContext
           .getScheduler()).getNodeTracker().getNodesPerPartition(label);
-      if (nodes != null && !nodes.isEmpty()) {
+      if (nodes != null) {
         nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
         multiNodePolicy.addAndRefreshNodesSet(
             (Collection<N>) nodesByPartition.values(), label);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java
index c8a7e66..8c5691f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -30,8 +31,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 
 /**
  * Node Sorting Manager which runs all sorter threads and policies.
@@ -48,6 +51,7 @@ public class MultiNodeSortingManager<N extends SchedulerNode>
   private Set<MultiNodePolicySpec> policySpecs = new HashSet<MultiNodePolicySpec>();
   private Configuration conf;
   private boolean multiNodePlacementEnabled;
+  private long skipNodeInterval;
 
   public MultiNodeSortingManager() {
     super("MultiNodeSortingManager");
@@ -59,6 +63,7 @@ public class MultiNodeSortingManager<N extends SchedulerNode>
     LOG.info("Initializing NodeSortingService=" + getName());
     super.serviceInit(configuration);
     this.conf = configuration;
+    this.skipNodeInterval = YarnConfiguration.getSkipNodeInterval(conf);
   }
 
   @Override
@@ -134,6 +139,42 @@ public class MultiNodeSortingManager<N extends SchedulerNode>
       policy.addAndRefreshNodesSet(nodes, partition);
     }
 
-    return policy.getPreferredNodeIterator(nodes, partition);
+    Iterator<N> nodesIterator = policy.getPreferredNodeIterator(nodes,
+        partition);
+
+    // Skip node which missed YarnConfiguration.SCHEDULER_SKIP_NODE_MULTIPLIER
+    // heartbeats since the node might be dead and we should not continue
+    // allocate containers on that.
+    Iterator<N> filteringIterator = new Iterator() {
+      private N cached;
+      private boolean hasCached;
+      @Override
+      public boolean hasNext() {
+        if (hasCached) {
+          return true;
+        }
+        while (nodesIterator.hasNext()) {
+          cached = nodesIterator.next();
+          if (SchedulerUtils.isNodeHeartbeated(cached, skipNodeInterval)) {
+            hasCached = true;
+            return true;
+          }
+        }
+        return false;
+      }
+
+      @Override
+      public N next() {
+        if (hasCached) {
+          hasCached = false;
+          return cached;
+        }
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+        return next();
+      }
+    };
+    return filteringIterator;
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index 5f2bbf0..653a6ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -574,8 +574,6 @@ public class TestCapacitySchedulerAsyncScheduling {
         + ".scheduling-interval-ms", 100);
     // Heartbeat interval is 100 ms.
     conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, heartbeatInterval);
-
-    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
     mgr.init(conf);
 
     // inject node label manager
@@ -648,6 +646,112 @@ public class TestCapacitySchedulerAsyncScheduling {
     rm.close();
   }
 
+  /**
+   * Make sure scheduler skips NMs which are not RUNNING.
+   * @throws Exception
+   */
+  @Test
+  public void testAsyncSchedulerSkipNoRunningNMs() throws Exception {
+    int heartbeatInterval = 100;
+    conf.setInt(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
+        1);
+    conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+        + ".scheduling-interval-ms", 100);
+    // Heartbeat interval is 100 ms.
+    conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
+        heartbeatInterval);
+    conf.setInt(YarnConfiguration.SCHEDULER_SKIP_NODE_MULTIPLIER,
+        5);
+    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+
+    // inject node label manager
+    MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+
+    List<MockNM> nms = new ArrayList<>();
+    // Add 10 nodes to the cluster, in the cluster we have 200 GB resource
+    for (int i = 0; i < 10; i++) {
+      nms.add(rm.registerNode("127.0.0." + i + ":1234", 20 * GB));
+    }
+
+    keepNMHeartbeat(nms, heartbeatInterval);
+
+    List<MockAM> ams = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      RMApp rmApp = MockRMAppSubmitter.submit(rm,
+          MockRMAppSubmissionData.Builder.createWithMemory(1024, rm)
+              .withAppName("app")
+              .withUser("user")
+              .withAcls(null)
+              .withUnmanagedAM(false)
+              .withQueue(Character.toString((char) (i % 34 + 97)))
+              .withMaxAppAttempts(1)
+              .withCredentials(null)
+              .withAppType(null)
+              .withWaitForAppAcceptedState(false)
+              .build());
+      MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm);
+      am.registerAppAttempt();
+      ams.add(am);
+    }
+
+    // Test for no NodeState.RUNNING node
+    for (int i = 0; i < 5; i++) {
+      RMNode rmNode = cs.getNode(nms.get(i).getNodeId()).getRMNode();
+      cs.getRMContext().getDispatcher().getEventHandler().handle(
+          new RMNodeEvent(rmNode.getNodeID(),
+              RMNodeEventType.GRACEFUL_DECOMMISSION));
+      rm.drainEvents();
+      Assert.assertEquals(NodeState.DECOMMISSIONING, rmNode.getState());
+      boolean shouldSkip =
+          cs.shouldSkipNodeSchedule(cs.getNode(nms.get(i).getNodeId()),
+              cs, true);
+      // make sure should skip
+      Assert.assertTrue(shouldSkip);
+    }
+
+    for (int i = 5; i < 9; i++) {
+      boolean shouldSkip =
+          cs.shouldSkipNodeSchedule(cs.getNode(nms.get(i).getNodeId()),
+              cs, true);
+      // make sure should not skip
+      Assert.assertFalse(shouldSkip);
+    }
+
+    pauseNMHeartbeat();
+
+    //Not exceed configured 5
+    Thread.sleep(heartbeatInterval * 3);
+
+    // Applications request containers.
+    for (int i = 0; i < 3; i++) {
+      ams.get(i).allocate("*", 1024, 20 * (i + 1), new ArrayList<>());
+    }
+
+    // Wait for 2000 ms.
+    Thread.sleep(2000);
+
+    //Make sure that NM 0-5 don't have non-AM containers.
+    for (int i = 0; i < 9; i++) {
+      if (i < 5) {
+        Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) == 0);
+      } else {
+        Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) > 0);
+      }
+    }
+    rm.close();
+  }
+
   public static class NMHeartbeatThread extends Thread {
     private List<MockNM> mockNMS;
     private int interval;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
index 29de815..b20f8e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
@@ -19,11 +19,14 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.collect.Iterators;
+
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -437,4 +440,39 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
 
     rm1.close();
   }
+
+  @Test
+  public void testMultiNodeSorterAfterHeartbeatInterval() throws Exception {
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    rm.registerNode("127.0.0.1:1234", 10 * GB);
+    rm.registerNode("127.0.0.2:1234", 10 * GB);
+    rm.registerNode("127.0.0.3:1234", 10 * GB);
+    rm.registerNode("127.0.0.4:1234", 10 * GB);
+
+    Set<SchedulerNode> nodes = new HashSet<>();
+    String partition = "";
+
+    ResourceScheduler scheduler = rm.getRMContext().getScheduler();
+    waitforNMRegistered(scheduler, 4, 5);
+    MultiNodeSortingManager<SchedulerNode> mns = rm.getRMContext()
+        .getMultiNodeSortingManager();
+    MultiNodeSorter<SchedulerNode> sorter = mns
+        .getMultiNodePolicy(POLICY_CLASS_NAME);
+    sorter.reSortClusterNodes();
+
+    Iterator<SchedulerNode> nodeIterator = mns.getMultiNodeSortIterator(
+        nodes, partition, POLICY_CLASS_NAME);
+    Assert.assertEquals(4, Iterators.size(nodeIterator));
+
+    // Validate the count after missing 3 node heartbeats
+    Thread.sleep(YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS * 3);
+
+    nodeIterator = mns.getMultiNodeSortIterator(
+        nodes, partition, POLICY_CLASS_NAME);
+    Assert.assertEquals(0, Iterators.size(nodeIterator));
+
+    rm.stop();
+  }
+
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java
index 65e0a17..e1435ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.java
@@ -111,6 +111,7 @@ public class TestCapacitySchedulerMultiNodesWithPreemption
     conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
         ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
     conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+    conf.setLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 60000);
   }
 
   @Test(timeout=60000)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java
index 67ff8cc..e37a8d8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java
@@ -127,6 +127,7 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
       conf.set(policyConfPrefix + ".sorting-interval.ms", "0");
       conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
           YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+      conf.setLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 30000);
       rm = new MockRM(conf);
       bind(ResourceManager.class).toInstance(rm);
       serve("/*").with(GuiceContainer.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org