You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2017/02/17 14:49:57 UTC

[17/50] [abbrv] hadoop git commit: YARN-6163. FS Preemption is a trickle for severely starved applications. (kasha)

YARN-6163. FS Preemption is a trickle for severely starved applications. (kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6c25dbcd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6c25dbcd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6c25dbcd

Branch: refs/heads/HDFS-10285
Commit: 6c25dbcdc0517a825b92fb16444aa1d3761e160c
Parents: a136936
Author: Karthik Kambatla <ka...@cloudera.com>
Authored: Wed Feb 15 23:16:01 2017 -0800
Committer: Karthik Kambatla <ka...@cloudera.com>
Committed: Wed Feb 15 23:16:12 2017 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/util/resource/Resources.java    |  18 +++
 .../scheduler/AbstractYarnScheduler.java        |   4 +
 .../scheduler/fair/FSAppAttempt.java            | 110 ++++++++++++--
 .../scheduler/fair/FSLeafQueue.java             | 111 +++++++++-----
 .../scheduler/fair/FSPreemptionThread.java      | 132 ++++++++---------
 .../scheduler/fair/FairScheduler.java           |   4 +
 .../fair/FairSchedulerConfiguration.java        |  23 ++-
 .../fair/VisitedResourceRequestTracker.java     | 146 +++++++++++++++++++
 .../fair/FairSchedulerWithMockPreemption.java   |   5 +-
 .../scheduler/fair/TestFSAppStarvation.java     |  20 ++-
 .../fair/TestFairSchedulerPreemption.java       |  45 +++---
 .../fair/TestVisitedResourceRequestTracker.java | 112 ++++++++++++++
 12 files changed, 585 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index 044a232..57b3a46 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -182,6 +182,24 @@ public class Resources {
     return subtractFrom(clone(lhs), rhs);
   }
 
+  /**
+   * Subtract <code>rhs</code> from <code>lhs</code> and reset any negative
+   * values to zero.
+   * @param lhs {@link Resource} to subtract from
+   * @param rhs {@link Resource} to subtract
+   * @return the value of lhs after subtraction
+   */
+  public static Resource subtractFromNonNegative(Resource lhs, Resource rhs) {
+    subtractFrom(lhs, rhs);
+    if (lhs.getMemorySize() < 0) {
+      lhs.setMemorySize(0);
+    }
+    if (lhs.getVirtualCores() < 0) {
+      lhs.setVirtualCores(0);
+    }
+    return lhs;
+  }
+
   public static Resource negate(Resource resource) {
     return subtract(NONE, resource);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 64427b7..ce6d2a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -127,6 +127,7 @@ public abstract class AbstractYarnScheduler
    */
   protected ConcurrentMap<ApplicationId, SchedulerApplication<T>> applications;
   protected int nmExpireInterval;
+  protected long nmHeartbeatInterval;
 
   protected final static List<Container> EMPTY_CONTAINER_LIST =
       new ArrayList<Container>();
@@ -163,6 +164,9 @@ public abstract class AbstractYarnScheduler
     nmExpireInterval =
         conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
           YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+    nmHeartbeatInterval =
+        conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
     long configuredMaximumAllocationWaitTime =
         conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
           YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 6dfcc84..563b892 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -87,6 +87,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   private final Set<RMContainer> containersToPreempt = new HashSet<>();
   private Resource fairshareStarvation = Resources.none();
   private long lastTimeAtFairShare;
+  private long nextStarvationCheck;
 
   // minShareStarvation attributed to this application by the leaf queue
   private Resource minshareStarvation = Resources.none();
@@ -211,15 +212,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       blacklistNodeIds.addAll(scheduler.getBlacklistedNodes(this));
     }
     for (FSSchedulerNode node: blacklistNodeIds) {
-      Resources.subtractFrom(availableResources,
+      Resources.subtractFromNonNegative(availableResources,
           node.getUnallocatedResource());
     }
-    if (availableResources.getMemorySize() < 0) {
-      availableResources.setMemorySize(0);
-    }
-    if (availableResources.getVirtualCores() < 0) {
-      availableResources.setVirtualCores(0);
-    }
   }
 
   /**
@@ -530,6 +525,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
 
   /**
+   * Get last computed fairshare starvation.
+   *
+   * @return last computed fairshare starvation
+   */
+  Resource getFairshareStarvation() {
+    return fairshareStarvation;
+  }
+
+  /**
    * Set the minshare attributed to this application. To be called only from
    * {@link FSLeafQueue#updateStarvedApps}.
    *
@@ -1077,17 +1081,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
 
   /**
-   * Helper method that computes the extent of fairshare fairshareStarvation.
+   * Helper method that computes the extent of fairshare starvation.
+   * @return freshly computed fairshare starvation
    */
   Resource fairShareStarvation() {
     Resource threshold = Resources.multiply(
         getFairShare(), fsQueue.getFairSharePreemptionThreshold());
-    Resource starvation = Resources.subtractFrom(threshold, getResourceUsage());
+    Resource starvation = Resources.componentwiseMin(threshold, demand);
+    Resources.subtractFromNonNegative(starvation, getResourceUsage());
 
     long now = scheduler.getClock().getTime();
-    boolean starved = Resources.greaterThan(
-        fsQueue.getPolicy().getResourceCalculator(),
-        scheduler.getClusterResource(), starvation, Resources.none());
+    boolean starved = !Resources.isNone(starvation);
 
     if (!starved) {
       lastTimeAtFairShare = now;
@@ -1111,6 +1115,81 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     return !Resources.isNone(fairshareStarvation);
   }
 
+  /**
+   * Fetch a list of RRs corresponding to the extent the app is starved
+   * (fairshare and minshare). This method considers the number of containers
+   * in a RR and also only one locality-level (the first encountered
+   * resourceName).
+   *
+   * @return list of {@link ResourceRequest}s corresponding to the amount of
+   * starvation.
+   */
+  List<ResourceRequest> getStarvedResourceRequests() {
+    // List of RRs we build in this method to return
+    List<ResourceRequest> ret = new ArrayList<>();
+
+    // Track visited RRs to avoid the same RR at multiple locality levels
+    VisitedResourceRequestTracker visitedRRs =
+        new VisitedResourceRequestTracker(scheduler.getNodeTracker());
+
+    // Start with current starvation and track the pending amount
+    Resource pending = getStarvation();
+    for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) {
+      if (Resources.isNone(pending)) {
+        // Found enough RRs to match the starvation
+        break;
+      }
+
+      // See if we have already seen this RR
+      if (!visitedRRs.visit(rr)) {
+        continue;
+      }
+
+      // A RR can have multiple containers of a capability. We need to
+      // compute the number of containers that fit in "pending".
+      int numContainersThatFit = (int) Math.floor(
+          Resources.ratio(scheduler.getResourceCalculator(),
+              pending, rr.getCapability()));
+      if (numContainersThatFit == 0) {
+        // This RR's capability is too large to fit in pending
+        continue;
+      }
+
+      // If the RR is only partially being satisfied, include only the
+      // partial number of containers.
+      if (numContainersThatFit < rr.getNumContainers()) {
+        rr = ResourceRequest.newInstance(rr.getPriority(),
+            rr.getResourceName(), rr.getCapability(), numContainersThatFit);
+      }
+
+      // Add the RR to return list and adjust "pending" accordingly
+      ret.add(rr);
+      Resources.subtractFromNonNegative(pending,
+          Resources.multiply(rr.getCapability(), rr.getNumContainers()));
+    }
+
+    return ret;
+  }
+
+  /**
+   * Notify this app that preemption has been triggered to make room for
+   * outstanding demand. The app should not be considered starved until after
+   * the specified delay.
+   *
+   * @param delayBeforeNextStarvationCheck duration to wait
+   */
+  void preemptionTriggered(long delayBeforeNextStarvationCheck) {
+    nextStarvationCheck =
+        scheduler.getClock().getTime() + delayBeforeNextStarvationCheck;
+  }
+
+  /**
+   * Whether this app's starvation should be considered.
+   */
+  boolean shouldCheckForStarvation() {
+    return scheduler.getClock().getTime() >= nextStarvationCheck;
+  }
+
   /* Schedulable methods implementation */
 
   @Override
@@ -1123,6 +1202,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     return demand;
   }
 
+  /**
+   * Get the current app's unsatisfied demand.
+   */
+  Resource getPendingDemand() {
+    return Resources.subtract(demand, getResourceUsage());
+  }
+
   @Override
   public long getStartTime() {
     return startTime;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index 16070e0..c4b2de6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -220,54 +220,53 @@ public class FSLeafQueue extends FSQueue {
   }
 
   /**
-   * Helper method to identify starved applications. This needs to be called
-   * ONLY from {@link #updateInternal}, after the application shares
-   * are updated.
-   *
-   * A queue can be starving due to fairshare or minshare.
-   *
-   * Minshare is defined only on the queue and not the applications.
-   * Fairshare is defined for both the queue and the applications.
-   *
-   * If this queue is starved due to minshare, we need to identify the most
-   * deserving apps if they themselves are not starved due to fairshare.
+   * Compute the extent of fairshare starvation for a set of apps.
    *
-   * If this queue is starving due to fairshare, there must be at least
-   * one application that is starved. And, even if the queue is not
-   * starved due to fairshare, there might still be starved applications.
+   * @param appsWithDemand apps to compute fairshare starvation for
+   * @return aggregate fairshare starvation for all apps
    */
-  private void updateStarvedApps() {
-    // First identify starved applications and track total amount of
-    // starvation (in resources)
+  private Resource updateStarvedAppsFairshare(
+      TreeSet<FSAppAttempt> appsWithDemand) {
     Resource fairShareStarvation = Resources.clone(none());
-
     // Fetch apps with unmet demand sorted by fairshare starvation
-    TreeSet<FSAppAttempt> appsWithDemand = fetchAppsWithDemand();
     for (FSAppAttempt app : appsWithDemand) {
       Resource appStarvation = app.fairShareStarvation();
-      if (!Resources.equals(Resources.none(), appStarvation))  {
+      if (!Resources.isNone(appStarvation))  {
         context.getStarvedApps().addStarvedApp(app);
         Resources.addTo(fairShareStarvation, appStarvation);
       } else {
         break;
       }
     }
+    return fairShareStarvation;
+  }
 
-    // Compute extent of minshare starvation
-    Resource minShareStarvation = minShareStarvation();
-
-    // Compute minshare starvation that is not subsumed by fairshare starvation
-    Resources.subtractFrom(minShareStarvation, fairShareStarvation);
+  /**
+   * Distribute minshare starvation to a set of apps
+   * @param appsWithDemand set of apps
+   * @param minShareStarvation minshare starvation to distribute
+   */
+  private void updateStarvedAppsMinshare(
+      final TreeSet<FSAppAttempt> appsWithDemand,
+      final Resource minShareStarvation) {
+    Resource pending = Resources.clone(minShareStarvation);
 
     // Keep adding apps to the starved list until the unmet demand goes over
     // the remaining minshare
     for (FSAppAttempt app : appsWithDemand) {
-      if (Resources.greaterThan(policy.getResourceCalculator(),
-          scheduler.getClusterResource(), minShareStarvation, none())) {
-        Resource appPendingDemand =
-            Resources.subtract(app.getDemand(), app.getResourceUsage());
-        Resources.subtractFrom(minShareStarvation, appPendingDemand);
-        app.setMinshareStarvation(appPendingDemand);
+      if (!Resources.isNone(pending)) {
+        Resource appMinShare = app.getPendingDemand();
+        Resources.subtractFromNonNegative(
+            appMinShare, app.getFairshareStarvation());
+
+        if (Resources.greaterThan(policy.getResourceCalculator(),
+            scheduler.getClusterResource(), appMinShare, pending)) {
+          Resources.subtractFromNonNegative(appMinShare, pending);
+          pending = none();
+        } else {
+          Resources.subtractFromNonNegative(pending, appMinShare);
+        }
+        app.setMinshareStarvation(appMinShare);
         context.getStarvedApps().addStarvedApp(app);
       } else {
         // Reset minshare starvation in case we had set it in a previous
@@ -277,6 +276,40 @@ public class FSLeafQueue extends FSQueue {
     }
   }
 
+  /**
+   * Helper method to identify starved applications. This needs to be called
+   * ONLY from {@link #updateInternal}, after the application shares
+   * are updated.
+   *
+   * A queue can be starving due to fairshare or minshare.
+   *
+   * Minshare is defined only on the queue and not the applications.
+   * Fairshare is defined for both the queue and the applications.
+   *
+   * If this queue is starved due to minshare, we need to identify the most
+   * deserving apps if they themselves are not starved due to fairshare.
+   *
+   * If this queue is starving due to fairshare, there must be at least
+   * one application that is starved. And, even if the queue is not
+   * starved due to fairshare, there might still be starved applications.
+   */
+  private void updateStarvedApps() {
+    // Fetch apps with pending demand
+    TreeSet<FSAppAttempt> appsWithDemand = fetchAppsWithDemand(false);
+
+    // Process apps with fairshare starvation
+    Resource fairShareStarvation = updateStarvedAppsFairshare(appsWithDemand);
+
+    // Compute extent of minshare starvation
+    Resource minShareStarvation = minShareStarvation();
+
+    // Compute minshare starvation that is not subsumed by fairshare starvation
+    Resources.subtractFromNonNegative(minShareStarvation, fairShareStarvation);
+
+    // Assign this minshare to apps with pending demand over fairshare
+    updateStarvedAppsMinshare(appsWithDemand, minShareStarvation);
+  }
+
   @Override
   public Resource getDemand() {
     return demand;
@@ -352,7 +385,7 @@ public class FSLeafQueue extends FSQueue {
       return assigned;
     }
 
-    for (FSAppAttempt sched : fetchAppsWithDemand()) {
+    for (FSAppAttempt sched : fetchAppsWithDemand(true)) {
       if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) {
         continue;
       }
@@ -368,14 +401,24 @@ public class FSLeafQueue extends FSQueue {
     return assigned;
   }
 
-  private TreeSet<FSAppAttempt> fetchAppsWithDemand() {
+  /**
+   * Fetch the subset of apps that have unmet demand. When used for
+   * preemption-related code (as opposed to allocation), omits apps that
+   * should not be checked for starvation.
+   *
+   * @param assignment whether the apps are for allocation containers, as
+   *                   opposed to preemption calculations
+   * @return Set of apps with unmet demand
+   */
+  private TreeSet<FSAppAttempt> fetchAppsWithDemand(boolean assignment) {
     TreeSet<FSAppAttempt> pendingForResourceApps =
         new TreeSet<>(policy.getComparator());
     readLock.lock();
     try {
       for (FSAppAttempt app : runnableApps) {
         Resource pending = app.getAppAttemptResourceUsage().getPending();
-        if (!pending.equals(none())) {
+        if (!Resources.isNone(pending) &&
+            (assignment || app.shouldCheckForStarvation())) {
           pendingForResourceApps.add(app);
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
index f166878..af73c10 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.util.ArrayList;
@@ -43,20 +41,26 @@ class FSPreemptionThread extends Thread {
   protected final FSContext context;
   private final FairScheduler scheduler;
   private final long warnTimeBeforeKill;
+  private final long delayBeforeNextStarvationCheck;
   private final Timer preemptionTimer;
 
   FSPreemptionThread(FairScheduler scheduler) {
+    setDaemon(true);
+    setName("FSPreemptionThread");
     this.scheduler = scheduler;
     this.context = scheduler.getContext();
     FairSchedulerConfiguration fsConf = scheduler.getConf();
     context.setPreemptionEnabled();
     context.setPreemptionUtilizationThreshold(
         fsConf.getPreemptionUtilizationThreshold());
-    warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill();
     preemptionTimer = new Timer("Preemption Timer", true);
 
-    setDaemon(true);
-    setName("FSPreemptionThread");
+    warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill();
+    long allocDelay = (fsConf.isContinuousSchedulingEnabled()
+        ? 10 * fsConf.getContinuousSchedulingSleepMs() // 10 runs
+        : 4 * scheduler.getNMHeartbeatInterval()); // 4 heartbeats
+    delayBeforeNextStarvationCheck = warnTimeBeforeKill + allocDelay +
+        fsConf.getWaitTimeBeforeNextStarvationCheck();
   }
 
   public void run() {
@@ -64,13 +68,8 @@ class FSPreemptionThread extends Thread {
       FSAppAttempt starvedApp;
       try{
         starvedApp = context.getStarvedApps().take();
-        if (!Resources.isNone(starvedApp.getStarvation())) {
-          PreemptableContainers containers =
-              identifyContainersToPreempt(starvedApp);
-          if (containers != null) {
-            preemptContainers(containers.containers);
-          }
-        }
+        preemptContainers(identifyContainersToPreempt(starvedApp));
+        starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck);
       } catch (InterruptedException e) {
         LOG.info("Preemption thread interrupted! Exiting.");
         return;
@@ -79,58 +78,57 @@ class FSPreemptionThread extends Thread {
   }
 
   /**
-   * Given an app, identify containers to preempt to satisfy the app's next
-   * resource request.
+   * Given an app, identify containers to preempt to satisfy the app's
+   * starvation.
+   *
+   * Mechanics:
+   * 1. Fetch all {@link ResourceRequest}s corresponding to the amount of
+   * starvation.
+   * 2. For each {@link ResourceRequest}, iterate through matching
+   * nodes and identify containers to preempt all on one node, also
+   * optimizing for least number of AM container preemptions.
    *
    * @param starvedApp starved application for which we are identifying
    *                   preemption targets
-   * @return list of containers to preempt to satisfy starvedApp, null if the
-   * app cannot be satisfied by preempting any running containers
+   * @return list of containers to preempt to satisfy starvedApp
    */
-  private PreemptableContainers identifyContainersToPreempt(
+  private List<RMContainer> identifyContainersToPreempt(
       FSAppAttempt starvedApp) {
-    PreemptableContainers bestContainers = null;
-
-    // Find the nodes that match the next resource request
-    SchedulingPlacementSet nextPs =
-        starvedApp.getAppSchedulingInfo().getFirstSchedulingPlacementSet();
-    PendingAsk firstPendingAsk = nextPs.getPendingAsk(ResourceRequest.ANY);
-    // TODO (KK): Should we check other resource requests if we can't match
-    // the first one?
-
-    Resource requestCapability = firstPendingAsk.getPerAllocationResource();
-
-    List<FSSchedulerNode> potentialNodes =
-        scheduler.getNodeTracker().getNodesByResourceName(
-            nextPs.getAcceptedResouceNames().next().toString());
+    List<RMContainer> containersToPreempt = new ArrayList<>();
+
+    // Iterate through enough RRs to address app's starvation
+    for (ResourceRequest rr : starvedApp.getStarvedResourceRequests()) {
+      for (int i = 0; i < rr.getNumContainers(); i++) {
+        PreemptableContainers bestContainers = null;
+        List<FSSchedulerNode> potentialNodes = scheduler.getNodeTracker()
+            .getNodesByResourceName(rr.getResourceName());
+        for (FSSchedulerNode node : potentialNodes) {
+          // TODO (YARN-5829): Attempt to reserve the node for starved app.
+          if (isNodeAlreadyReserved(node, starvedApp)) {
+            continue;
+          }
 
-    // From the potential nodes, pick a node that has enough containers
-    // from apps over their fairshare
-    for (FSSchedulerNode node : potentialNodes) {
-      // TODO (YARN-5829): Attempt to reserve the node for starved app. The
-      // subsequent if-check needs to be reworked accordingly.
-      FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
-      if (nodeReservedApp != null && !nodeReservedApp.equals(starvedApp)) {
-        // This node is already reserved by another app. Let us not consider
-        // this for preemption.
-        continue;
-      }
+          int maxAMContainers = bestContainers == null ?
+              Integer.MAX_VALUE : bestContainers.numAMContainers;
+          PreemptableContainers preemptableContainers =
+              identifyContainersToPreemptOnNode(
+                  rr.getCapability(), node, maxAMContainers);
+          if (preemptableContainers != null) {
+            // This set is better than any previously identified set.
+            bestContainers = preemptableContainers;
+            if (preemptableContainers.numAMContainers == 0) {
+              break;
+            }
+          }
+        } // End of iteration through nodes for one RR
 
-      int maxAMContainers = bestContainers == null ?
-          Integer.MAX_VALUE : bestContainers.numAMContainers;
-      PreemptableContainers preemptableContainers =
-          identifyContainersToPreemptOnNode(requestCapability, node,
-              maxAMContainers);
-      if (preemptableContainers != null) {
-        if (preemptableContainers.numAMContainers == 0) {
-          return preemptableContainers;
-        } else {
-          bestContainers = preemptableContainers;
+        if (bestContainers != null && bestContainers.containers.size() > 0) {
+          containersToPreempt.addAll(bestContainers.containers);
+          trackPreemptionsAgainstNode(bestContainers.containers);
         }
       }
-    }
-
-    return bestContainers;
+    } // End of iteration over RRs
+    return containersToPreempt;
   }
 
   /**
@@ -181,23 +179,25 @@ class FSPreemptionThread extends Thread {
     return null;
   }
 
-  private void preemptContainers(List<RMContainer> containers) {
-    // Mark the containers as being considered for preemption on the node.
-    // Make sure the containers are subsequently removed by calling
-    // FSSchedulerNode#removeContainerForPreemption.
-    if (containers.size() > 0) {
-      FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker()
-          .getNode(containers.get(0).getNodeId());
-      node.addContainersForPreemption(containers);
-    }
+  private boolean isNodeAlreadyReserved(
+      FSSchedulerNode node, FSAppAttempt app) {
+    FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
+    return nodeReservedApp != null && !nodeReservedApp.equals(app);
+  }
 
+  private void trackPreemptionsAgainstNode(List<RMContainer> containers) {
+    FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker()
+        .getNode(containers.get(0).getNodeId());
+    node.addContainersForPreemption(containers);
+  }
+
+  private void preemptContainers(List<RMContainer> containers) {
     // Warn application about containers to be killed
     for (RMContainer container : containers) {
       ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
       FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
-      FSLeafQueue queue = app.getQueue();
       LOG.info("Preempting container " + container +
-          " from queue " + queue.getName());
+          " from queue " + app.getQueueName());
       app.trackContainerForPreemption(container);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 134efff..18806bc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -1774,4 +1774,8 @@ public class FairScheduler extends
   public float getReservableNodesRatio() {
     return reservableNodesRatio;
   }
+
+  long getNMHeartbeatInterval() {
+    return nmHeartbeatInterval;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
index b18dd7d..8e8e37b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
@@ -114,12 +114,24 @@ public class FairSchedulerConfiguration extends Configuration {
   protected static final String PREEMPTION_THRESHOLD =
       CONF_PREFIX + "preemption.cluster-utilization-threshold";
   protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f;
-  
-  protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval";
-  protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
+
   protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill";
   protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;
 
+  /**
+   * Configurable delay (ms) before an app's starvation is considered after
+   * it is identified. This is to give the scheduler enough time to
+   * allocate containers post preemption. This delay is added to the
+   * {@link #WAIT_TIME_BEFORE_KILL} and enough heartbeats.
+   *
+   * This is intended to be a backdoor on production clusters, and hence
+   * intentionally not documented.
+   */
+  protected static final String WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS =
+      CONF_PREFIX + "waitTimeBeforeNextStarvationCheck";
+  protected static final long
+      DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS = 10000;
+
   /** Whether to assign multiple containers in one check-in. */
   public static final String  ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
   protected static final boolean DEFAULT_ASSIGN_MULTIPLE = false;
@@ -251,8 +263,9 @@ public class FairSchedulerConfiguration extends Configuration {
     		"/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");
   }
   
-  public int getPreemptionInterval() {
-    return getInt(PREEMPTION_INTERVAL, DEFAULT_PREEMPTION_INTERVAL);
+  public long getWaitTimeBeforeNextStarvationCheck() {
+    return getLong(WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS,
+        DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS);
   }
   
   public int getWaitTimeBeforeKill() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/VisitedResourceRequestTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/VisitedResourceRequestTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/VisitedResourceRequestTracker.java
new file mode 100644
index 0000000..f157263
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/VisitedResourceRequestTracker.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Applications place {@link ResourceRequest}s at multiple levels. This is a
+ * helper class that allows tracking if a {@link ResourceRequest} has been
+ * visited at a different locality level.
+ *
+ * This is implemented for {@link FSAppAttempt#getStarvedResourceRequests()}.
+ * The implementation is not thread-safe.
+ */
+class VisitedResourceRequestTracker {
+  private static final Log LOG =
+      LogFactory.getLog(VisitedResourceRequestTracker.class);
+  private final Map<Priority, Map<Resource, TrackerPerPriorityResource>> map =
+      new HashMap<>();
+  private final ClusterNodeTracker<FSSchedulerNode> nodeTracker;
+
+  VisitedResourceRequestTracker(
+      ClusterNodeTracker<FSSchedulerNode> nodeTracker) {
+    this.nodeTracker = nodeTracker;
+  }
+
+  /**
+   * Check if the {@link ResourceRequest} is visited before, and track it.
+   * @param rr {@link ResourceRequest} to visit
+   * @return true if <code>rr</code> is the first visit across all
+   * locality levels, false otherwise
+   */
+  boolean visit(ResourceRequest rr) {
+    Priority priority = rr.getPriority();
+    Resource capability = rr.getCapability();
+
+    Map<Resource, TrackerPerPriorityResource> subMap = map.get(priority);
+    if (subMap == null) {
+      subMap = new HashMap<>();
+      map.put(priority, subMap);
+    }
+
+    TrackerPerPriorityResource tracker = subMap.get(capability);
+    if (tracker == null) {
+      tracker = new TrackerPerPriorityResource();
+      subMap.put(capability, tracker);
+    }
+
+    return tracker.visit(rr.getResourceName());
+  }
+
+  private class TrackerPerPriorityResource {
+    private Set<String> racksWithNodesVisited = new HashSet<>();
+    private Set<String> racksVisted = new HashSet<>();
+    private boolean anyVisited;
+
+    private boolean visitAny() {
+      if (racksVisted.isEmpty() && racksWithNodesVisited.isEmpty()) {
+        anyVisited = true;
+      }
+      return anyVisited;
+    }
+
+    private boolean visitRack(String rackName) {
+      if (anyVisited || racksWithNodesVisited.contains(rackName)) {
+        return false;
+      } else {
+        racksVisted.add(rackName);
+        return true;
+      }
+    }
+
+    private boolean visitNode(String rackName) {
+      if (anyVisited || racksVisted.contains(rackName)) {
+        return false;
+      } else {
+        racksWithNodesVisited.add(rackName);
+        return true;
+      }
+    }
+
+    /**
+     * Based on whether <code>resourceName</code> is a node, rack or ANY,
+     * check if this has been visited earlier.
+     *
+     * A node is considered visited if its rack or ANY have been visited.
+     * A rack is considered visited if any nodes or ANY have been visited.
+     * Any is considered visited if any of the nodes/racks have been visited.
+     *
+     * @param resourceName nodename or rackname or ANY
+     * @return true if this is the first visit, false otherwise
+     */
+    private boolean visit(String resourceName) {
+      if (resourceName.equals(ResourceRequest.ANY)) {
+        return visitAny();
+      }
+
+      List<FSSchedulerNode> nodes =
+          nodeTracker.getNodesByResourceName(resourceName);
+      int numNodes = nodes.size();
+      if (numNodes == 0) {
+        LOG.error("Found ResourceRequest for a non-existent node/rack named " +
+            resourceName);
+        return false;
+      }
+
+      if (numNodes == 1) {
+        // Found a single node. To be safe, let us verify it is a node and
+        // not a rack with a single node.
+        FSSchedulerNode node = nodes.get(0);
+        if (node.getNodeName().equals(resourceName)) {
+          return visitNode(node.getRackName());
+        }
+      }
+
+      // At this point, it is not ANY or a node. Must be a rack
+      return visitRack(resourceName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
index 25780cd..706cdc9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
@@ -21,6 +21,8 @@ import java.util.HashSet;
 import java.util.Set;
 
 public class FairSchedulerWithMockPreemption extends FairScheduler {
+  static final long DELAY_FOR_NEXT_STARVATION_CHECK_MS = 10 * 60 * 1000;
+
   @Override
   protected void createPreemptionThread() {
     preemptionThread = new MockPreemptionThread(this);
@@ -30,7 +32,7 @@ public class FairSchedulerWithMockPreemption extends FairScheduler {
     private Set<FSAppAttempt> appsAdded = new HashSet<>();
     private int totalAppsAdded = 0;
 
-    MockPreemptionThread(FairScheduler scheduler) {
+    private MockPreemptionThread(FairScheduler scheduler) {
       super(scheduler);
     }
 
@@ -41,6 +43,7 @@ public class FairSchedulerWithMockPreemption extends FairScheduler {
           FSAppAttempt app = context.getStarvedApps().take();
           appsAdded.add(app);
           totalAppsAdded++;
+          app.preemptionTriggered(DELAY_FOR_NEXT_STARVATION_CHECK_MS);
         } catch (InterruptedException e) {
           return;
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
index a5b2d86..3a79ac0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.ControlledClock;
 
 import org.junit.After;
 import static org.junit.Assert.assertEquals;
@@ -43,6 +44,8 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
 
   private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES");
 
+  private final ControlledClock clock = new ControlledClock();
+
   // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
   private static final int NODE_CAPACITY_MULTIPLE = 4;
   private static final String[] QUEUES =
@@ -99,11 +102,17 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
             + "minshare and fairshare queues",
         3, preemptionThread.uniqueAppsAdded());
 
-    // Verify the apps get added again on a subsequent update
+    // Verify apps are added again only after the set delay for starvation has
+    // passed.
+    clock.tickSec(1);
     scheduler.update();
-    Thread.yield();
-
+    assertEquals("Apps re-added even before starvation delay passed",
+        preemptionThread.totalAppsAdded(), preemptionThread.uniqueAppsAdded());
     verifyLeafQueueStarvation();
+
+    clock.tickMsec(
+        FairSchedulerWithMockPreemption.DELAY_FOR_NEXT_STARVATION_CHECK_MS);
+    scheduler.update();
     assertTrue("Each app is marked as starved exactly once",
         preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded());
   }
@@ -141,7 +150,7 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
     sendEnoughNodeUpdatesToAssignFully();
 
     // Sleep to hit the preemption timeouts
-    Thread.sleep(10);
+    clock.tickMsec(10);
 
     // Scheduler update to populate starved apps
     scheduler.update();
@@ -208,8 +217,9 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
         ALLOC_FILE.exists());
 
     resourceManager = new MockRM(conf);
-    resourceManager.start();
     scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+    scheduler.setClock(clock);
+    resourceManager.start();
     preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread)
         scheduler.preemptionThread;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index 16df1ed..a4d69bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.util.ControlledClock;
 import org.junit.After;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -49,6 +50,9 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
   private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
   private static final int GB = 1024;
 
+  // Scheduler clock
+  private final ControlledClock clock = new ControlledClock();
+
   // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
   private static final int NODE_CAPACITY_MULTIPLE = 4;
 
@@ -60,25 +64,28 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
   // Starving app that is expected to instigate preemption
   private FSAppAttempt starvingApp;
 
-  @Parameterized.Parameters
-  public static Collection<Boolean[]> getParameters() {
-    return Arrays.asList(new Boolean[][] {
-        {true}, {false}});
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection<Object[]> getParameters() {
+    return Arrays.asList(new Object[][] {
+        {"FairSharePreemption", true},
+        {"MinSharePreemption", false}});
   }
 
-  public TestFairSchedulerPreemption(Boolean fairshare) throws IOException {
+  public TestFairSchedulerPreemption(String name, boolean fairshare)
+      throws IOException {
     fairsharePreemption = fairshare;
     writeAllocFile();
   }
 
   @Before
-  public void setup() {
+  public void setup() throws IOException {
     createConfiguration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
         ALLOC_FILE.getAbsolutePath());
     conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
     conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
     conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0);
+    setupCluster();
   }
 
   @After
@@ -166,8 +173,9 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
 
   private void setupCluster() throws IOException {
     resourceManager = new MockRM(conf);
-    resourceManager.start();
     scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+    scheduler.setClock(clock);
+    resourceManager.start();
 
     // Create and add two nodes to the cluster
     addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE);
@@ -197,7 +205,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
    *
    * @param queueName queue name
    */
-  private void takeAllResource(String queueName) {
+  private void takeAllResources(String queueName) {
     // Create an app that takes up all the resources on the cluster
     ApplicationAttemptId appAttemptId
         = createSchedulingRequest(GB, 1, queueName, "default",
@@ -227,8 +235,8 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
         NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2);
     starvingApp = scheduler.getSchedulerApp(appAttemptId);
 
-    // Sleep long enough to pass
-    Thread.sleep(10);
+    // Move clock enough to identify starvation
+    clock.tickSec(1);
     scheduler.update();
   }
 
@@ -243,14 +251,13 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
    */
   private void submitApps(String queue1, String queue2)
       throws InterruptedException {
-    takeAllResource(queue1);
+    takeAllResources(queue1);
     preemptHalfResources(queue2);
   }
 
   private void verifyPreemption() throws InterruptedException {
-    // Sleep long enough for four containers to be preempted. Note that the
-    // starved app must be queued four times for containers to be preempted.
-    for (int i = 0; i < 10000; i++) {
+    // Sleep long enough for four containers to be preempted.
+    for (int i = 0; i < 100; i++) {
       if (greedyApp.getLiveContainers().size() == 4) {
         break;
       }
@@ -268,7 +275,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
 
   private void verifyNoPreemption() throws InterruptedException {
     // Sleep long enough to ensure not even one container is preempted.
-    for (int i = 0; i < 600; i++) {
+    for (int i = 0; i < 100; i++) {
       if (greedyApp.getLiveContainers().size() != 8) {
         break;
       }
@@ -279,7 +286,6 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
 
   @Test
   public void testPreemptionWithinSameLeafQueue() throws Exception {
-    setupCluster();
     String queue = "root.preemptable.child-1";
     submitApps(queue, queue);
     if (fairsharePreemption) {
@@ -291,21 +297,18 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
 
   @Test
   public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
-    setupCluster();
     submitApps("root.preemptable.child-1", "root.preemptable.child-2");
     verifyPreemption();
   }
 
   @Test
   public void testPreemptionBetweenNonSiblingQueues() throws Exception {
-    setupCluster();
     submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
     verifyPreemption();
   }
 
   @Test
   public void testNoPreemptionFromDisallowedQueue() throws Exception {
-    setupCluster();
     submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1");
     verifyNoPreemption();
   }
@@ -331,9 +334,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
 
   @Test
   public void testPreemptionSelectNonAMContainer() throws Exception {
-    setupCluster();
-
-    takeAllResource("root.preemptable.child-1");
+    takeAllResources("root.preemptable.child-1");
     setNumAMContainersPerNode(2);
     preemptHalfResources("root.preemptable.child-2");
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c25dbcd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestVisitedResourceRequestTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestVisitedResourceRequestTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestVisitedResourceRequestTracker.java
new file mode 100644
index 0000000..07b8498
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestVisitedResourceRequestTracker.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at*
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestVisitedResourceRequestTracker {
+  private final ClusterNodeTracker<FSSchedulerNode>
+      nodeTracker = new ClusterNodeTracker<>();
+  private final ResourceRequest
+      anyRequest, rackRequest, node1Request, node2Request;
+
+  private final String NODE_VISITED = "The node is already visited. ";
+  private final String RACK_VISITED = "The rack is already visited. ";
+  private final String ANY_VISITED = "ANY is already visited. ";
+  private final String NODE_FAILURE = "The node is visited again.";
+  private final String RACK_FAILURE = "The rack is visited again.";
+  private final String ANY_FAILURE = "ANY is visited again.";
+  private final String FIRST_CALL_FAILURE = "First call to visit failed.";
+
+  public TestVisitedResourceRequestTracker() {
+    List<RMNode> rmNodes =
+        MockNodes.newNodes(1, 2, Resources.createResource(8192, 8));
+
+    FSSchedulerNode node1 = new FSSchedulerNode(rmNodes.get(0), false);
+    nodeTracker.addNode(node1);
+    node1Request = createRR(node1.getNodeName(), 1);
+
+    FSSchedulerNode node2 = new FSSchedulerNode(rmNodes.get(1), false);
+    node2Request = createRR(node2.getNodeName(), 1);
+    nodeTracker.addNode(node2);
+
+    anyRequest = createRR(ResourceRequest.ANY, 2);
+    rackRequest = createRR(node1.getRackName(), 2);
+  }
+
+  private ResourceRequest createRR(String resourceName, int count) {
+    return ResourceRequest.newInstance(
+        Priority.UNDEFINED, resourceName, Resources.none(), count);
+  }
+
+  @Test
+  public void testVisitAnyRequestFirst() {
+    VisitedResourceRequestTracker tracker =
+        new VisitedResourceRequestTracker(nodeTracker);
+
+    // Visit ANY request first
+    assertTrue(FIRST_CALL_FAILURE, tracker.visit(anyRequest));
+
+    // All other requests should return false
+    assertFalse(ANY_VISITED + RACK_FAILURE, tracker.visit(rackRequest));
+    assertFalse(ANY_VISITED + NODE_FAILURE, tracker.visit(node1Request));
+    assertFalse(ANY_VISITED + NODE_FAILURE, tracker.visit(node2Request));
+  }
+
+  @Test
+  public void testVisitRackRequestFirst() {
+    VisitedResourceRequestTracker tracker =
+        new VisitedResourceRequestTracker(nodeTracker);
+
+    // Visit rack request first
+    assertTrue(FIRST_CALL_FAILURE, tracker.visit(rackRequest));
+
+    // All other requests should return false
+    assertFalse(RACK_VISITED + ANY_FAILURE, tracker.visit(anyRequest));
+    assertFalse(RACK_VISITED + NODE_FAILURE, tracker.visit(node1Request));
+    assertFalse(RACK_VISITED + NODE_FAILURE, tracker.visit(node2Request));
+  }
+
+  @Test
+  public void testVisitNodeRequestFirst() {
+    VisitedResourceRequestTracker tracker =
+        new VisitedResourceRequestTracker(nodeTracker);
+
+    // Visit node1 first
+    assertTrue(FIRST_CALL_FAILURE, tracker.visit(node1Request));
+
+    // Rack and ANY should return false
+    assertFalse(NODE_VISITED + ANY_FAILURE, tracker.visit(anyRequest));
+    assertFalse(NODE_VISITED + RACK_FAILURE, tracker.visit(rackRequest));
+
+    // The other node should return true
+    assertTrue(NODE_VISITED + "Different node visit failed",
+        tracker.visit(node2Request));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org