You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2015/03/06 23:45:34 UTC

hadoop git commit: YARN-3275. CapacityScheduler: Preemption happening on non-preemptable queues. Contributed by Eric Payne (cherry picked from commit 27e8ea820fab8dce59f4db9814e73bd60c1d4ef1)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 ab251fd35 -> ef3d9bdf6


YARN-3275. CapacityScheduler: Preemption happening on non-preemptable queues. Contributed by Eric Payne
(cherry picked from commit 27e8ea820fab8dce59f4db9814e73bd60c1d4ef1)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ef3d9bdf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ef3d9bdf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ef3d9bdf

Branch: refs/heads/branch-2
Commit: ef3d9bdf6ba7ded263dd32eb66e362774a4b868e
Parents: ab251fd
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Mar 6 22:36:18 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Mar 6 22:45:09 2015 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +++
 .../hadoop/yarn/util/resource/Resources.java    |  5 ++++
 .../ProportionalCapacityPreemptionPolicy.java   | 27 ++++++++++++++++----
 ...estProportionalCapacityPreemptionPolicy.java | 24 +++++++++++++++++
 4 files changed, 54 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef3d9bdf/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 48d2302..d565db5 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -677,6 +677,9 @@ Release 2.7.0 - UNRELEASED
     YARN-3227. Timeline renew delegation token fails when RM user's TGT is expired
     (Zhijie Shen via xgong)
 
+    YARN-3275. CapacityScheduler: Preemption happening on non-preemptable
+    queues (Eric Payne via jlowe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef3d9bdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index a205bd1..bcb0421 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -260,4 +260,9 @@ public class Resources {
     return createResource(Math.min(lhs.getMemory(), rhs.getMemory()),
         Math.min(lhs.getVirtualCores(), rhs.getVirtualCores()));
   }
+  
+  public static Resource componentwiseMax(Resource lhs, Resource rhs) {
+    return createResource(Math.max(lhs.getMemory(), rhs.getMemory()),
+        Math.max(lhs.getVirtualCores(), rhs.getVirtualCores()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef3d9bdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 738f527..87a2a00 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -527,6 +527,17 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>();
 
     for (TempQueue qT : queues) {
+      if (qT.preemptionDisabled && qT.leafQueue != null) {
+        if (LOG.isDebugEnabled()) {
+          if (Resources.greaterThan(rc, clusterResource,
+              qT.toBePreempted, Resource.newInstance(0, 0))) {
+            LOG.debug("Tried to preempt the following "
+                      + "resources from non-preemptable queue: "
+                      + qT.queueName + " - Resources: " + qT.toBePreempted);
+          }
+        }
+        continue;
+      }
       // we act only if we are violating balance by more than
       // maxIgnoredOverCapacity
       if (Resources.greaterThan(rc, clusterResource, qT.current,
@@ -734,6 +745,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
       float absUsed = root.getAbsoluteUsedCapacity();
       float absCap = root.getAbsoluteCapacity();
       float absMaxCap = root.getAbsoluteMaximumCapacity();
+      boolean preemptionDisabled = root.getPreemptionDisabled();
 
       Resource current = Resources.multiply(clusterResources, absUsed);
       Resource guaranteed = Resources.multiply(clusterResources, absCap);
@@ -747,8 +759,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
         LeafQueue l = (LeafQueue) root;
         Resource pending = l.getTotalResourcePending();
         ret = new TempQueue(queueName, current, pending, guaranteed,
-            maxCapacity);
-        if (root.getPreemptionDisabled()) {
+            maxCapacity, preemptionDisabled);
+        if (preemptionDisabled) {
           ret.untouchableExtra = extra;
         } else {
           ret.preemptableExtra = extra;
@@ -757,7 +769,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
       } else {
         Resource pending = Resource.newInstance(0, 0);
         ret = new TempQueue(root.getQueueName(), current, pending, guaranteed,
-            maxCapacity);
+            maxCapacity, false);
         Resource childrensPreemptable = Resource.newInstance(0, 0);
         for (CSQueue c : root.getChildQueues()) {
           TempQueue subq = cloneQueues(c, clusterResources);
@@ -816,9 +828,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
 
     final ArrayList<TempQueue> children;
     LeafQueue leafQueue;
+    boolean preemptionDisabled;
 
     TempQueue(String queueName, Resource current, Resource pending,
-        Resource guaranteed, Resource maxCapacity) {
+        Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled) {
       this.queueName = queueName;
       this.current = current;
       this.pending = pending;
@@ -831,6 +844,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
       this.children = new ArrayList<TempQueue>();
       this.untouchableExtra = Resource.newInstance(0, 0);
       this.preemptableExtra = Resource.newInstance(0, 0);
+      this.preemptionDisabled = preemptionDisabled;
     }
 
     public void setLeafQueue(LeafQueue l){
@@ -862,10 +876,13 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     // the unused ones
     Resource offer(Resource avail, ResourceCalculator rc,
         Resource clusterResource) {
+      Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
+                      Resources.subtract(maxCapacity, idealAssigned),
+                      Resource.newInstance(0, 0));
       // remain = avail - min(avail, (max - assigned), (current + pending - assigned))
       Resource accepted = 
           Resources.min(rc, clusterResource, 
-              Resources.subtract(maxCapacity, idealAssigned),
+              absMaxCapIdealAssignedDelta,
           Resources.min(rc, clusterResource, avail, Resources.subtract(
               Resources.add(current, pending), idealAssigned)));
       Resource remain = Resources.subtract(avail, accepted);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef3d9bdf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 696b9bb..8f5237e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -532,6 +532,30 @@ public class TestProportionalCapacityPreemptionPolicy {
   }
 
   @Test
+  public void testPerQueueDisablePreemptionOverAbsMaxCapacity() {
+    int[][] qData = new int[][] {
+        //  /    A              D
+        //            B    C         E    F
+        {1000, 725, 360, 365, 275,  17, 258 },  // absCap
+        {1000,1000,1000,1000, 550, 109,1000 },  // absMaxCap
+        {1000, 741, 396, 345, 259, 110, 149 },  // used
+        {  40,  20,   0,  20,  20,  20,   0 },  // pending
+        {   0,   0,   0,   0,   0,   0,   0 },  // reserved
+        //          appA appB     appC appD
+        {   4,   2,   1,   1,   2,   1,   1 },  // apps
+        {  -1,  -1,   1,   1,  -1,   1,   1 },  // req granulrity
+        {   2,   2,   0,   0,   2,   0,   0 },  // subqueues
+    };
+    // QueueE inherits non-preemption from QueueD
+    schedConf.setPreemptionDisabled("root.queueD", true);
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // appC is running on QueueE. QueueE is over absMaxCap, but is not
+    // preemptable. Therefore, appC resources should not be preempted.
+    verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC)));
+  }
+
+  @Test
   public void testOverCapacityImbalance() {
     int[][] qData = new int[][]{
       //  /   A   B   C