You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2015/07/14 09:24:56 UTC

hadoop git commit: YARN-3453. Ensure preemption logic in FairScheduler uses DominantResourceCalculator in DRF queues to prevent unnecessary thrashing. (asuresh)

Repository: hadoop
Updated Branches:
  refs/heads/trunk a431ed907 -> ac94ba3e1


YARN-3453. Ensure preemption logic in FairScheduler uses DominantResourceCalculator in DRF queues to prevent unnecessary thrashing. (asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ac94ba3e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ac94ba3e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ac94ba3e

Branch: refs/heads/trunk
Commit: ac94ba3e185115b83351e35c610c2b8ff91b1ebc
Parents: a431ed9
Author: Arun Suresh <as...@apache.org>
Authored: Tue Jul 14 00:23:55 2015 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Tue Jul 14 00:23:55 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/fair/FSLeafQueue.java             |   9 +-
 .../scheduler/fair/FairScheduler.java           |  40 ++--
 .../scheduler/fair/SchedulingPolicy.java        |  11 +
 .../DominantResourceFairnessPolicy.java         |  18 +-
 .../fair/policies/FairSharePolicy.java          |  11 +-
 .../scheduler/fair/policies/FifoPolicy.java     |  15 +-
 .../scheduler/fair/TestFSLeafQueue.java         |  64 ++++++
 .../scheduler/fair/TestFairScheduler.java       | 207 ++++++++++++++++---
 9 files changed, 317 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac94ba3e/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 5c17f04..780c667 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -628,6 +628,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3381. Fix typo InvalidStateTransitonException.
     (Brahma Reddy Battula via aajisaka)
 
+    YARN-3453. Ensure preemption logic in FairScheduler uses DominantResourceCalculator
+    in DRF queues to prevent unnecessary thrashing. (asuresh)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac94ba3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index 6779a1b..f90a198 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -560,9 +560,10 @@ public class FSLeafQueue extends FSQueue {
   }
 
   private boolean isStarved(Resource share) {
-    Resource desiredShare = Resources.min(scheduler.getResourceCalculator(),
-        scheduler.getClusterResource(), share, getDemand());
-    return Resources.lessThan(scheduler.getResourceCalculator(),
-        scheduler.getClusterResource(), getResourceUsage(), desiredShare);
+    Resource desiredShare = Resources.min(policy.getResourceCalculator(),
+            scheduler.getClusterResource(), share, getDemand());
+    Resource resourceUsage = getResourceUsage();
+    return Resources.lessThan(policy.getResourceCalculator(),
+            scheduler.getClusterResource(), resourceUsage, desiredShare);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac94ba3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index cbc10e7..efe6544 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -371,10 +371,9 @@ public class FairScheduler extends
 
     Resource resToPreempt = Resources.clone(Resources.none());
     for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
-      Resources.addTo(resToPreempt, resToPreempt(sched, curTime));
+      Resources.addTo(resToPreempt, resourceDeficit(sched, curTime));
     }
-    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
-        Resources.none())) {
+    if (isResourceGreaterThanNone(resToPreempt)) {
       preemptResources(resToPreempt);
     }
   }
@@ -404,8 +403,7 @@ public class FairScheduler extends
       RMContainer container = warnedIter.next();
       if ((container.getState() == RMContainerState.RUNNING ||
               container.getState() == RMContainerState.ALLOCATED) &&
-          Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
-              toPreempt, Resources.none())) {
+              isResourceGreaterThanNone(toPreempt)) {
         warnOrKillContainer(container);
         Resources.subtractFrom(toPreempt, container.getContainer().getResource());
       } else {
@@ -419,8 +417,7 @@ public class FairScheduler extends
         queue.resetPreemptedResources();
       }
 
-      while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
-          toPreempt, Resources.none())) {
+      while (isResourceGreaterThanNone(toPreempt)) {
         RMContainer container =
             getQueueManager().getRootQueue().preemptContainer();
         if (container == null) {
@@ -442,7 +439,11 @@ public class FairScheduler extends
     long duration = getClock().getTime() - start;
     fsOpDurations.addPreemptCallDuration(duration);
   }
-  
+
+  private boolean isResourceGreaterThanNone(Resource toPreempt) {
+    return (toPreempt.getMemory() > 0) || (toPreempt.getVirtualCores() > 0);
+  }
+
   protected void warnOrKillContainer(RMContainer container) {
     ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
     FSAppAttempt app = getSchedulerApp(appAttemptId);
@@ -485,33 +486,34 @@ public class FairScheduler extends
    * max of the two amounts (this shouldn't happen unless someone sets the
    * timeouts to be identical for some reason).
    */
-  protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
+  protected Resource resourceDeficit(FSLeafQueue sched, long curTime) {
     long minShareTimeout = sched.getMinSharePreemptionTimeout();
     long fairShareTimeout = sched.getFairSharePreemptionTimeout();
     Resource resDueToMinShare = Resources.none();
     Resource resDueToFairShare = Resources.none();
+    ResourceCalculator calc = sched.getPolicy().getResourceCalculator();
     if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
-      Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
+      Resource target = Resources.componentwiseMin(
           sched.getMinShare(), sched.getDemand());
-      resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
+      resDueToMinShare = Resources.max(calc, clusterResource,
           Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
     }
     if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) {
-      Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
-          sched.getFairShare(), sched.getDemand());
-      resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
+      Resource target = Resources.componentwiseMin(
+              sched.getFairShare(), sched.getDemand());
+      resDueToFairShare = Resources.max(calc, clusterResource,
           Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
     }
-    Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource,
+    Resource deficit = Resources.max(calc, clusterResource,
         resDueToMinShare, resDueToFairShare);
-    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
-        resToPreempt, Resources.none())) {
-      String message = "Should preempt " + resToPreempt + " res for queue "
+    if (Resources.greaterThan(calc, clusterResource,
+        deficit, Resources.none())) {
+      String message = "Should preempt " + deficit + " res for queue "
           + sched.getName() + ": resDueToMinShare = " + resDueToMinShare
           + ", resDueToFairShare = " + resDueToFairShare;
       LOG.info(message);
     }
-    return resToPreempt;
+    return deficit;
   }
 
   public synchronized RMContainerTokenSecretManager

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac94ba3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
index abdc834..160ba4b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
@@ -26,6 +26,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.Dom
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 
+
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.concurrent.ConcurrentHashMap;
@@ -98,6 +101,14 @@ public abstract class SchedulingPolicy {
   public void initialize(Resource clusterCapacity) {}
 
   /**
+   * The {@link ResourceCalculator} returned by this method should be used
+   * for any calculations involving resources.
+   *
+   * @return ResourceCalculator instance to use
+   */
+  public abstract ResourceCalculator getResourceCalculator();
+
+  /**
    * @return returns the name of {@link SchedulingPolicy}
    */
   public abstract String getName();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac94ba3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
index 86d503b..45fbf98 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
@@ -29,6 +29,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
+
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import static org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType.*;
@@ -44,8 +47,10 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
 
   public static final String NAME = "DRF";
 
-  private DominantResourceFairnessComparator comparator =
+  private static final DominantResourceFairnessComparator COMPARATOR =
       new DominantResourceFairnessComparator();
+  private static final DominantResourceCalculator CALCULATOR =
+      new DominantResourceCalculator();
 
   @Override
   public String getName() {
@@ -59,9 +64,14 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
 
   @Override
   public Comparator<Schedulable> getComparator() {
-    return comparator;
+    return COMPARATOR;
   }
-  
+
+  @Override
+  public ResourceCalculator getResourceCalculator() {
+    return CALCULATOR;
+  }
+
   @Override
   public void computeShares(Collection<? extends Schedulable> schedulables,
       Resource totalResources) {
@@ -105,7 +115,7 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
 
   @Override
   public void initialize(Resource clusterCapacity) {
-    comparator.setClusterCapacity(clusterCapacity);
+    COMPARATOR.setClusterCapacity(clusterCapacity);
   }
 
   public static class DominantResourceFairnessComparator implements Comparator<Schedulable> {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac94ba3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
index 918db9d..3b9f07f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -43,7 +44,8 @@ public class FairSharePolicy extends SchedulingPolicy {
   public static final String NAME = "fair";
   private static final DefaultResourceCalculator RESOURCE_CALCULATOR =
       new DefaultResourceCalculator();
-  private FairShareComparator comparator = new FairShareComparator();
+  private static final FairShareComparator COMPARATOR =
+          new FairShareComparator();
 
   @Override
   public String getName() {
@@ -111,7 +113,12 @@ public class FairSharePolicy extends SchedulingPolicy {
 
   @Override
   public Comparator<Schedulable> getComparator() {
-    return comparator;
+    return COMPARATOR;
+  }
+
+  @Override
+  public ResourceCalculator getResourceCalculator() {
+    return RESOURCE_CALCULATOR;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac94ba3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
index 7d88933..a644e58 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
@@ -27,6 +27,10 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
+
+
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -36,7 +40,9 @@ import com.google.common.annotations.VisibleForTesting;
 public class FifoPolicy extends SchedulingPolicy {
   @VisibleForTesting
   public static final String NAME = "FIFO";
-  private FifoComparator comparator = new FifoComparator();
+  private static final FifoComparator COMPARATOR = new FifoComparator();
+  private static final DefaultResourceCalculator CALCULATOR =
+          new DefaultResourceCalculator();
 
   @Override
   public String getName() {
@@ -68,7 +74,12 @@ public class FifoPolicy extends SchedulingPolicy {
 
   @Override
   public Comparator<Schedulable> getComparator() {
-    return comparator;
+    return COMPARATOR;
+  }
+
+  @Override
+  public ResourceCalculator getResourceCalculator() {
+    return CALCULATOR;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac94ba3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
index 385ea0b..7637410 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
@@ -233,6 +233,70 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
     assertFalse(queueB2.isStarvedForFairShare());
   }
 
+  @Test (timeout = 5000)
+  public void testIsStarvedForFairShareDRF() throws Exception {
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>.5</weight>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>.5</weight>");
+    out.println("</queue>");
+    out.println("<defaultFairSharePreemptionThreshold>1</defaultFairSharePreemptionThreshold>");
+    out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
+    out.println("</allocations>");
+    out.close();
+
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+
+    // Add one big node (only care about aggregate capacity)
+    RMNode node1 =
+            MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1,
+                    "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    scheduler.update();
+
+    // Queue A wants 7 * 1024, 1. Node update gives this all to A
+    createSchedulingRequest(7 * 1024, 1, "queueA", "user1", 1);
+    scheduler.update();
+    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
+    scheduler.handle(nodeEvent2);
+
+    QueueManager queueMgr = scheduler.getQueueManager();
+    FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false);
+    assertEquals(7 * 1024, queueA.getResourceUsage().getMemory());
+    assertEquals(1, queueA.getResourceUsage().getVirtualCores());
+
+    // Queue B has 3 reqs :
+    // 1) 2 * 1024, 5 .. which will be granted
+    // 2) 1 * 1024, 1 .. which will be granted
+    // 3) 1 * 1024, 1 .. which wont
+    createSchedulingRequest(2 * 1024, 5, "queueB", "user1", 1);
+    createSchedulingRequest(1 * 1024, 2, "queueB", "user1", 2);
+    scheduler.update();
+    for (int i = 0; i < 3; i ++) {
+      scheduler.handle(nodeEvent2);
+    }
+
+    FSLeafQueue queueB = queueMgr.getLeafQueue("queueB", false);
+    assertEquals(3 * 1024, queueB.getResourceUsage().getMemory());
+    assertEquals(6, queueB.getResourceUsage().getVirtualCores());
+
+    scheduler.update();
+
+    // Verify that Queue us not starved for fair share..
+    // Since the Starvation logic now uses DRF when the policy = drf, The
+    // Queue should not be starved
+    assertFalse(queueB.isStarvedForFairShare());
+  }
+
   @Test
   public void testConcurrentAccess() {
     conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac94ba3e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 56e8adc..2260f73 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -100,7 +100,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.Dom
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ControlledClock;
-import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Assert;
@@ -1706,8 +1705,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     clock.tickSec(11);
 
     scheduler.update();
-    Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager()
-        .getLeafQueue("queueA.queueA2", false), clock.getTime());
+    Resource toPreempt = scheduler.resourceDeficit(scheduler.getQueueManager()
+            .getLeafQueue("queueA.queueA2", false), clock.getTime());
     assertEquals(3277, toPreempt.getMemory());
 
     // verify if the 3 containers required by queueA2 are preempted in the same
@@ -1829,25 +1828,173 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         scheduler.getQueueManager().getLeafQueue("queueD", true);
 
     assertTrue(Resources.equals(
-        Resources.none(), scheduler.resToPreempt(schedC, clock.getTime())));
+        Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime())));
     assertTrue(Resources.equals(
-        Resources.none(), scheduler.resToPreempt(schedD, clock.getTime())));
+        Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime())));
     // After minSharePreemptionTime has passed, they should want to preempt min
     // share.
     clock.tickSec(6);
     assertEquals(
-        1024, scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
+        1024, scheduler.resourceDeficit(schedC, clock.getTime()).getMemory());
     assertEquals(
-        1024, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
+        1024, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
 
     // After fairSharePreemptionTime has passed, they should want to preempt
     // fair share.
     scheduler.update();
     clock.tickSec(6);
     assertEquals(
-        1536 , scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
+        1536 , scheduler.resourceDeficit(schedC, clock.getTime()).getMemory());
     assertEquals(
-        1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
+        1536, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
+  }
+
+  @Test
+/**
+ * Tests the timing of decision to preempt tasks.
+ */
+  public void testPreemptionDecisionWithDRF() throws Exception {
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    ControlledClock clock = new ControlledClock();
+    scheduler.setClock(clock);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"default\">");
+    out.println("<maxResources>0mb,0vcores</maxResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,1vcores</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,2vcores</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueC\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,3vcores</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueD\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,2vcores</minResources>");
+    out.println("</queue>");
+    out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
+    out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
+    out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
+    out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Create four nodes
+    RMNode node1 =
+            MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 1,
+                    "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    RMNode node2 =
+            MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 2,
+                    "127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+
+    RMNode node3 =
+            MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 3,
+                    "127.0.0.3");
+    NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+    scheduler.handle(nodeEvent3);
+
+    // Queue A and B each request three containers
+    ApplicationAttemptId app1 =
+            createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
+    ApplicationAttemptId app2 =
+            createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
+    ApplicationAttemptId app3 =
+            createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
+
+    ApplicationAttemptId app4 =
+            createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
+    ApplicationAttemptId app5 =
+            createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
+    ApplicationAttemptId app6 =
+            createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
+
+    scheduler.update();
+
+    // Sufficient node check-ins to fully schedule containers
+    for (int i = 0; i < 2; i++) {
+      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+      scheduler.handle(nodeUpdate1);
+
+      NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+      scheduler.handle(nodeUpdate2);
+
+      NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
+      scheduler.handle(nodeUpdate3);
+    }
+
+    // Now new requests arrive from queues C and D
+    ApplicationAttemptId app7 =
+            createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
+    ApplicationAttemptId app8 =
+            createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
+    ApplicationAttemptId app9 =
+            createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
+
+    ApplicationAttemptId app10 =
+            createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 1);
+    ApplicationAttemptId app11 =
+            createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 2);
+    ApplicationAttemptId app12 =
+            createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 3);
+
+    scheduler.update();
+
+    FSLeafQueue schedC =
+            scheduler.getQueueManager().getLeafQueue("queueC", true);
+    FSLeafQueue schedD =
+            scheduler.getQueueManager().getLeafQueue("queueD", true);
+
+    assertTrue(Resources.equals(
+            Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime())));
+    assertTrue(Resources.equals(
+            Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime())));
+
+    // Test :
+    // 1) whether componentWise min works as expected.
+    // 2) DRF calculator is used
+
+    // After minSharePreemptionTime has passed, they should want to preempt min
+    // share.
+    clock.tickSec(6);
+    Resource res = scheduler.resourceDeficit(schedC, clock.getTime());
+    assertEquals(1024, res.getMemory());
+    // Demand = 3
+    assertEquals(3, res.getVirtualCores());
+
+    res = scheduler.resourceDeficit(schedD, clock.getTime());
+    assertEquals(1024, res.getMemory());
+    // Demand = 6, but min share = 2
+    assertEquals(2, res.getVirtualCores());
+
+    // After fairSharePreemptionTime has passed, they should want to preempt
+    // fair share.
+    scheduler.update();
+    clock.tickSec(6);
+    res = scheduler.resourceDeficit(schedC, clock.getTime());
+    assertEquals(1536, res.getMemory());
+    assertEquals(3, res.getVirtualCores());
+
+    res = scheduler.resourceDeficit(schedD, clock.getTime());
+    assertEquals(1536, res.getMemory());
+    // Demand = 6, but fair share = 3
+    assertEquals(3, res.getVirtualCores());
   }
 
   @Test
@@ -1964,71 +2111,71 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     FSLeafQueue queueC = queueMgr.getLeafQueue("queueC", true);
 
     assertTrue(Resources.equals(
-        Resources.none(), scheduler.resToPreempt(queueB1, clock.getTime())));
+        Resources.none(), scheduler.resourceDeficit(queueB1, clock.getTime())));
     assertTrue(Resources.equals(
-        Resources.none(), scheduler.resToPreempt(queueB2, clock.getTime())));
+        Resources.none(), scheduler.resourceDeficit(queueB2, clock.getTime())));
     assertTrue(Resources.equals(
-        Resources.none(), scheduler.resToPreempt(queueC, clock.getTime())));
+        Resources.none(), scheduler.resourceDeficit(queueC, clock.getTime())));
 
     // After 5 seconds, queueB1 wants to preempt min share
     scheduler.update();
     clock.tickSec(6);
     assertEquals(
-       1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
+       1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
     assertEquals(
-        0, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
+        0, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
     assertEquals(
-        0, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
+        0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
 
     // After 10 seconds, queueB2 wants to preempt min share
     scheduler.update();
     clock.tickSec(5);
     assertEquals(
-        1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
+        1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
     assertEquals(
-        1024, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
+        1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
     assertEquals(
-        0, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
+        0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
 
     // After 15 seconds, queueC wants to preempt min share
     scheduler.update();
     clock.tickSec(5);
     assertEquals(
-        1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
+        1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
     assertEquals(
-        1024, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
+        1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
     assertEquals(
-        1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
+        1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
 
     // After 20 seconds, queueB2 should want to preempt fair share
     scheduler.update();
     clock.tickSec(5);
     assertEquals(
-        1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
+        1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
     assertEquals(
-        1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
+        1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
     assertEquals(
-        1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
+        1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
 
     // After 25 seconds, queueB1 should want to preempt fair share
     scheduler.update();
     clock.tickSec(5);
     assertEquals(
-        1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
+        1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
     assertEquals(
-        1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
+        1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
     assertEquals(
-        1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
+        1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
 
     // After 30 seconds, queueC should want to preempt fair share
     scheduler.update();
     clock.tickSec(5);
     assertEquals(
-        1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
+        1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
     assertEquals(
-        1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
+        1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
     assertEquals(
-        1536, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
+        1536, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
   }
 
   @Test