You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2014/12/05 22:07:52 UTC
hadoop git commit: YARN-2056. Disable preemption at Queue level.
Contributed by Eric Payne
Repository: hadoop
Updated Branches:
refs/heads/trunk 3c72f54ef -> 4b1308219
YARN-2056. Disable preemption at Queue level. Contributed by Eric Payne
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4b130821
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4b130821
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4b130821
Branch: refs/heads/trunk
Commit: 4b130821995a3cfe20c71e38e0f63294085c0491
Parents: 3c72f54
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Dec 5 21:06:48 2014 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Dec 5 21:06:48 2014 +0000
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 2 +
.../ProportionalCapacityPreemptionPolicy.java | 170 +++++++++--
...estProportionalCapacityPreemptionPolicy.java | 283 ++++++++++++++++++-
3 files changed, 424 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b130821/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 9804d61..0b88959 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -126,6 +126,8 @@ Release 2.7.0 - UNRELEASED
YARN-2301. Improved yarn container command. (Naganarasimha G R via jianhe)
+ YARN-2056. Disable preemption at Queue level (Eric Payne via jlowe)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b130821/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 0f48b0c..1a3f804 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.PriorityQueue;
import java.util.Set;
import org.apache.commons.logging.Log;
@@ -111,6 +112,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
public static final String NATURAL_TERMINATION_FACTOR =
"yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
+ public static final String BASE_YARN_RM_PREEMPTION = "yarn.scheduler.capacity.";
+ public static final String SUFFIX_DISABLE_PREEMPTION = ".disable_preemption";
+
// the dispatcher to send preempt and kill events
public EventHandler<ContainerPreemptEvent> dispatcher;
@@ -192,7 +196,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
// extract a summary of the queues from scheduler
TempQueue tRoot;
synchronized (scheduler) {
- tRoot = cloneQueues(root, clusterResources);
+ tRoot = cloneQueues(root, clusterResources, false);
}
// compute the ideal distribution of resources among queues
@@ -370,28 +374,60 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
private void computeFixpointAllocation(ResourceCalculator rc,
Resource tot_guarant, Collection<TempQueue> qAlloc, Resource unassigned,
boolean ignoreGuarantee) {
+ // Prior to assigning the unused resources, process each queue as follows:
+ // If current > guaranteed, idealAssigned = guaranteed + untouchable extra
+ // Else idealAssigned = current;
+ // Subtract idealAssigned resources from unassigned.
+ // If the queue has all of its needs met (that is, if
+ // idealAssigned >= current + pending), remove it from consideration.
+ // Sort queues from most under-guaranteed to most over-guaranteed.
+ TQComparator tqComparator = new TQComparator(rc, tot_guarant);
+ PriorityQueue<TempQueue> orderedByNeed =
+ new PriorityQueue<TempQueue>(10,tqComparator);
+ for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
+ TempQueue q = i.next();
+ if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) {
+ q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra);
+ } else {
+ q.idealAssigned = Resources.clone(q.current);
+ }
+ Resources.subtractFrom(unassigned, q.idealAssigned);
+ // If idealAssigned < (current + pending), q needs more resources, so
+ // add it to the list of underserved queues, ordered by need.
+ Resource curPlusPend = Resources.add(q.current, q.pending);
+ if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) {
+ orderedByNeed.add(q);
+ }
+ }
+
//assign all cluster resources until no more demand, or no resources are left
- while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
- unassigned, Resources.none())) {
+ while (!orderedByNeed.isEmpty()
+ && Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) {
Resource wQassigned = Resource.newInstance(0, 0);
-
// we compute normalizedGuarantees capacity based on currently active
// queues
- resetCapacity(rc, unassigned, qAlloc, ignoreGuarantee);
-
- // offer for each queue their capacity first and in following invocations
- // their share of over-capacity
- for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
+ resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee);
+
+ // For each underserved queue (or set of queues if multiple are equally
+ // underserved), offer its share of the unassigned resources based on its
+ // normalized guarantee. After the offer, if the queue is not satisfied,
+ // place it back in the ordered list of queues, recalculating its place
+ // in the order of most under-guaranteed to most over-guaranteed. In this
+ // way, the most underserved queue(s) are always given resources first.
+ Collection<TempQueue> underserved =
+ getMostUnderservedQueues(orderedByNeed, tqComparator);
+ for (Iterator<TempQueue> i = underserved.iterator(); i.hasNext();) {
TempQueue sub = i.next();
- Resource wQavail =
- Resources.multiply(unassigned, sub.normalizedGuarantee);
+ Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
+ unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1));
Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
Resource wQdone = Resources.subtract(wQavail, wQidle);
- // if the queue returned a value > 0 it means it is fully satisfied
- // and it is removed from the list of active queues qAlloc
- if (!Resources.greaterThan(rc, tot_guarant,
+
+ if (Resources.greaterThan(rc, tot_guarant,
wQdone, Resources.none())) {
- i.remove();
+ // The queue is still asking for more. Put it back in the priority
+ // queue, recalculating its order based on need.
+ orderedByNeed.add(sub);
}
Resources.addTo(wQassigned, wQdone);
}
@@ -399,6 +435,27 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
}
}
+ // Take the most underserved TempQueue (the one on the head). Collect and
+ // return the list of all queues that have the same idealAssigned
+ // percentage of guaranteed.
+ protected Collection<TempQueue> getMostUnderservedQueues(
+ PriorityQueue<TempQueue> orderedByNeed, TQComparator tqComparator) {
+ ArrayList<TempQueue> underserved = new ArrayList<TempQueue>();
+ while (!orderedByNeed.isEmpty()) {
+ TempQueue q1 = orderedByNeed.remove();
+ underserved.add(q1);
+ TempQueue q2 = orderedByNeed.peek();
+ // q1's pct of guaranteed won't be larger than q2's. If it's less, then
+ // return what has already been collected. Otherwise, q1's pct of
+ // guaranteed == that of q2, so add q2 to underserved list during the
+ // next pass.
+ if (q2 == null || tqComparator.compare(q1,q2) < 0) {
+ return underserved;
+ }
+ }
+ return underserved;
+ }
+
/**
* Computes a normalizedGuaranteed capacity based on active queues
* @param rc resource calculator
@@ -626,9 +683,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
*
* @param root the root of the CapacityScheduler queue hierarchy
* @param clusterResources the total amount of resources in the cluster
+ * @param parentDisablePreempt true if disable preemption is set for parent
* @return the root of the cloned queue hierarchy
*/
- private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
+ private TempQueue cloneQueues(CSQueue root, Resource clusterResources,
+ boolean parentDisablePreempt) {
TempQueue ret;
synchronized (root) {
String queueName = root.getQueueName();
@@ -639,19 +698,46 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
Resource current = Resources.multiply(clusterResources, absUsed);
Resource guaranteed = Resources.multiply(clusterResources, absCap);
Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
+
+ boolean queueDisablePreemption = false;
+ String queuePropName = BASE_YARN_RM_PREEMPTION + root.getQueuePath()
+ + SUFFIX_DISABLE_PREEMPTION;
+ queueDisablePreemption = scheduler.getConfiguration()
+ .getBoolean(queuePropName, parentDisablePreempt);
+
+ Resource extra = Resource.newInstance(0, 0);
+ if (Resources.greaterThan(rc, clusterResources, current, guaranteed)) {
+ extra = Resources.subtract(current, guaranteed);
+ }
if (root instanceof LeafQueue) {
LeafQueue l = (LeafQueue) root;
Resource pending = l.getTotalResourcePending();
ret = new TempQueue(queueName, current, pending, guaranteed,
maxCapacity);
-
+ if (queueDisablePreemption) {
+ ret.untouchableExtra = extra;
+ } else {
+ ret.preemptableExtra = extra;
+ }
ret.setLeafQueue(l);
} else {
Resource pending = Resource.newInstance(0, 0);
ret = new TempQueue(root.getQueueName(), current, pending, guaranteed,
maxCapacity);
+ Resource childrensPreemptable = Resource.newInstance(0, 0);
for (CSQueue c : root.getChildQueues()) {
- ret.addChild(cloneQueues(c, clusterResources));
+ TempQueue subq =
+ cloneQueues(c, clusterResources, queueDisablePreemption);
+ Resources.addTo(childrensPreemptable, subq.preemptableExtra);
+ ret.addChild(subq);
+ }
+ // untouchableExtra = max(extra - childrenPreemptable, 0)
+ if (Resources.greaterThanOrEqual(
+ rc, clusterResources, childrensPreemptable, extra)) {
+ ret.untouchableExtra = Resource.newInstance(0, 0);
+ } else {
+ ret.untouchableExtra =
+ Resources.subtractFrom(extra, childrensPreemptable);
}
}
}
@@ -690,6 +776,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
Resource idealAssigned;
Resource toBePreempted;
Resource actuallyPreempted;
+ Resource untouchableExtra;
+ Resource preemptableExtra;
double normalizedGuarantee;
@@ -708,6 +796,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
this.toBePreempted = Resource.newInstance(0, 0);
this.normalizedGuarantee = Float.NaN;
this.children = new ArrayList<TempQueue>();
+ this.untouchableExtra = Resource.newInstance(0, 0);
+ this.preemptableExtra = Resource.newInstance(0, 0);
}
public void setLeafQueue(LeafQueue l){
@@ -761,10 +851,20 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
.append(" IDEAL_ASSIGNED: ").append(idealAssigned)
.append(" IDEAL_PREEMPT: ").append(toBePreempted)
.append(" ACTUAL_PREEMPT: ").append(actuallyPreempted)
+ .append(" UNTOUCHABLE: ").append(untouchableExtra)
+ .append(" PREEMPTABLE: ").append(preemptableExtra)
.append("\n");
return sb.toString();
}
+
+ public void printAll() {
+ LOG.info(this.toString());
+ for (TempQueue sub : this.getChildren()) {
+ sub.printAll();
+ }
+ }
+
public void assignPreemption(float scalingFactor,
ResourceCalculator rc, Resource clusterResource) {
if (Resources.greaterThan(rc, clusterResource, current, idealAssigned)) {
@@ -793,4 +893,38 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
}
+ static class TQComparator implements Comparator<TempQueue> {
+ private ResourceCalculator rc;
+ private Resource clusterRes;
+
+ TQComparator(ResourceCalculator rc, Resource clusterRes) {
+ this.rc = rc;
+ this.clusterRes = clusterRes;
+ }
+
+ @Override
+ public int compare(TempQueue tq1, TempQueue tq2) {
+ if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
+ return -1;
+ }
+ if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) {
+ return 1;
+ }
+ return 0;
+ }
+
+ // Calculates idealAssigned / guaranteed
+ // TempQueues with 0 guarantees are always considered the most over
+ // capacity and therefore considered last for resources.
+ private double getIdealPctOfGuaranteed(TempQueue q) {
+ double pctOver = Integer.MAX_VALUE;
+ if (q != null && Resources.greaterThan(
+ rc, clusterRes, q.guaranteed, Resources.none())) {
+ pctOver =
+ Resources.divide(rc, clusterRes, q.idealAssigned, q.guaranteed);
+ }
+ return (pctOver);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b130821/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 24e70bb..ca67ef0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -17,16 +17,19 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.BASE_YARN_RM_PREEMPTION;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.SUFFIX_DISABLE_PREEMPTION;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.isA;
@@ -62,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptE
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -86,6 +90,7 @@ public class TestProportionalCapacityPreemptionPolicy {
Clock mClock = null;
Configuration conf = null;
CapacityScheduler mCS = null;
+ CapacitySchedulerConfiguration schedConf = null;
EventHandler<ContainerPreemptEvent> mDisp = null;
ResourceCalculator rc = new DefaultResourceCalculator();
final ApplicationAttemptId appA = ApplicationAttemptId.newInstance(
@@ -98,6 +103,8 @@ public class TestProportionalCapacityPreemptionPolicy {
ApplicationId.newInstance(TS, 3), 0);
final ApplicationAttemptId appE = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 4), 0);
+ final ApplicationAttemptId appF = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(TS, 4), 0);
final ArgumentCaptor<ContainerPreemptEvent> evtCaptor =
ArgumentCaptor.forClass(ContainerPreemptEvent.class);
@@ -123,6 +130,8 @@ public class TestProportionalCapacityPreemptionPolicy {
mClock = mock(Clock.class);
mCS = mock(CapacityScheduler.class);
when(mCS.getResourceCalculator()).thenReturn(rc);
+ schedConf = new CapacitySchedulerConfiguration();
+ when(mCS.getConfiguration()).thenReturn(schedConf);
mDisp = mock(EventHandler.class);
rand = new Random();
long seed = rand.nextLong();
@@ -266,6 +275,240 @@ public class TestProportionalCapacityPreemptionPolicy {
}
@Test
+ public void testPerQueueDisablePreemption() {
+ int[][] qData = new int[][]{
+ // / A B C
+ { 100, 55, 25, 20 }, // abs
+ { 100, 100, 100, 100 }, // maxCap
+ { 100, 0, 54, 46 }, // used
+ { 10, 10, 0, 0 }, // pending
+ { 0, 0, 0, 0 }, // reserved
+ // appA appB appC
+ { 3, 1, 1, 1 }, // apps
+ { -1, 1, 1, 1 }, // req granularity
+ { 3, 0, 0, 0 }, // subqueues
+ };
+
+ schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+ + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
+
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+ // With PREEMPTION_DISABLED set for queueB, get resources from queueC
+ verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB)));
+
+ // With no PREEMPTION_DISABLED set for queueB, resources will be preempted
+ // from both queueB and queueC. Test must be reset for so that the mDisp
+ // event handler will count only events from the following test and not the
+ // previous one.
+ setup();
+ ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
+
+ schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+ + "root.queueB" + SUFFIX_DISABLE_PREEMPTION, false);
+ policy2.editSchedule();
+
+ verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
+ verify(mDisp, times(6)).handle(argThat(new IsPreemptionRequestFor(appC)));
+ }
+
+ @Test
+ public void testPerQueueDisablePreemptionHierarchical() {
+ int[][] qData = new int[][] {
+ // / A D
+ // B C E F
+ { 200, 100, 50, 50, 100, 10, 90 }, // abs
+ { 200, 200, 200, 200, 200, 200, 200 }, // maxCap
+ { 200, 110, 60, 50, 90, 90, 0 }, // used
+ { 10, 0, 0, 0, 10, 0, 10 }, // pending
+ { 0, 0, 0, 0, 0, 0, 0 }, // reserved
+ // appA appB appC appD
+ { 4, 2, 1, 1, 2, 1, 1 }, // apps
+ { -1, -1, 1, 1, -1, 1, 1 }, // req granularity
+ { 2, 2, 0, 0, 2, 0, 0 }, // subqueues
+ };
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+ // verify capacity taken from queueB (appA), not queueE (appC) despite
+ // queueE being far over its absolute capacity because queueA (queueB's
+ // parent) is over capacity and queueD (queueE's parent) is not.
+ ApplicationAttemptId expectedAttemptOnQueueB =
+ ApplicationAttemptId.newInstance(
+ appA.getApplicationId(), appA.getAttemptId());
+ assertTrue("appA should be running on queueB",
+ mCS.getAppsInQueue("queueB").contains(expectedAttemptOnQueueB));
+ verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
+
+ // Need to call setup() again to reset mDisp
+ setup();
+ // Disable preemption for queueB and it's children
+ schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+ + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
+ ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
+ policy2.editSchedule();
+ ApplicationAttemptId expectedAttemptOnQueueC =
+ ApplicationAttemptId.newInstance(
+ appB.getApplicationId(), appB.getAttemptId());
+ ApplicationAttemptId expectedAttemptOnQueueE =
+ ApplicationAttemptId.newInstance(
+ appC.getApplicationId(), appC.getAttemptId());
+ // Now, all of queueB's (appA) over capacity is not preemptable, so neither
+ // is queueA's. Verify that capacity is taken from queueE (appC).
+ assertTrue("appB should be running on queueC",
+ mCS.getAppsInQueue("queueC").contains(expectedAttemptOnQueueC));
+ assertTrue("appC should be running on queueE",
+ mCS.getAppsInQueue("queueE").contains(expectedAttemptOnQueueE));
+ // Resources should have come from queueE (appC) and neither of queueA's
+ // children.
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
+ verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC)));
+ }
+
+ @Test
+ public void testPerQueueDisablePreemptionBroadHierarchical() {
+ int[][] qData = new int[][] {
+ // / A D G
+ // B C E F H I
+ {1000, 350, 150, 200, 400, 200, 200, 250, 100, 150 }, // abs
+ {1000,1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap
+ {1000, 400, 200, 200, 400, 250, 150, 200, 150, 50 }, // used
+ { 50, 0, 0, 0, 50, 0, 50, 0, 0, 0 }, // pending
+ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
+ // appA appB appC appD appE appF
+ { 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
+ { -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granulrity
+ { 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
+ };
+
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+ // queueF(appD) wants resources, Verify that resources come from queueE(appC)
+ // because it's a sibling and queueB(appA) because queueA is over capacity.
+ verify(mDisp, times(28)).handle(argThat(new IsPreemptionRequestFor(appA)));
+ verify(mDisp, times(22)).handle(argThat(new IsPreemptionRequestFor(appC)));
+
+ // Need to call setup() again to reset mDisp
+ setup();
+ // Disable preemption for queueB(appA)
+ schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+ + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
+ ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
+ policy2.editSchedule();
+ // Now that queueB(appA) is not preemptable, verify that resources come
+ // from queueE(appC)
+ verify(mDisp, times(50)).handle(argThat(new IsPreemptionRequestFor(appC)));
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
+
+ setup();
+ // Disable preemption for two of the 3 queues with over-capacity.
+ schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+ + "root.queueD.queueE" + SUFFIX_DISABLE_PREEMPTION, true);
+ schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+ + "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
+ ProportionalCapacityPreemptionPolicy policy3 = buildPolicy(qData);
+ policy3.editSchedule();
+
+ // Verify that the request was starved out even though queueH(appE) is
+ // over capacity. This is because queueG (queueH's parent) is NOT
+ // overcapacity.
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueB
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueC
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); // queueE
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); // queueH
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); // queueI
+ }
+
+ @Test
+ public void testPerQueueDisablePreemptionInheritParent() {
+ int[][] qData = new int[][] {
+ // / A E
+ // B C D F G H
+ {1000, 500, 200, 200, 100, 500, 200, 200, 100 }, // abs (guar)
+ {1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap
+ {1000, 700, 0, 350, 350, 300, 0, 200, 100 }, // used
+ { 200, 0, 0, 0, 0, 200, 200, 0, 0 }, // pending
+ { 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
+ // appA appB appC appD appE
+ { 5, 2, 0, 1, 1, 3, 1, 1, 1 }, // apps
+ { -1, -1, 1, 1, 1, -1, 1, 1, 1 }, // req granulrity
+ { 2, 3, 0, 0, 0, 3, 0, 0, 0 }, // subqueues
+ };
+
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+ // With all queues preemptable, resources should be taken from queueC(appA)
+ // and queueD(appB). Resources taken more from queueD(appB) than
+ // queueC(appA) because it's over its capacity by a larger percentage.
+ verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
+ verify(mDisp, times(182)).handle(argThat(new IsPreemptionRequestFor(appB)));
+
+ // Disable preemption for queueA and it's children. queueF(appC)'s request
+ // should starve.
+ setup(); // Call setup() to reset mDisp
+ schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+ + "root.queueA" + SUFFIX_DISABLE_PREEMPTION, true);
+ ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
+ policy2.editSchedule();
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueC
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueD
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); // queueG
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); // queueH
+ }
+
+ @Test
+ public void testPerQueuePreemptionNotAllUntouchable() {
+ int[][] qData = new int[][] {
+ // / A E
+ // B C D F G H
+ { 2000, 1000, 800, 100, 100, 1000, 500, 300, 200 }, // abs
+ { 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000 }, // maxCap
+ { 2000, 1300, 300, 800, 200, 700, 500, 0, 200 }, // used
+ { 300, 0, 0, 0, 0, 300, 0, 300, 0 }, // pending
+ { 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
+ // appA appB appC appD appE appF
+ { 6, 3, 1, 1, 1, 3, 1, 1, 1 }, // apps
+ { -1, -1, 1, 1, 1, -1, 1, 1, 1 }, // req granularity
+ { 2, 3, 0, 0, 0, 3, 0, 0, 0 }, // subqueues
+ };
+ schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+ + "root.queueA.queueC" + SUFFIX_DISABLE_PREEMPTION, true);
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+ // Although queueC(appB) is way over capacity and is untouchable,
+ // queueD(appC) is preemptable. Request should be filled from queueD(appC).
+ verify(mDisp, times(100)).handle(argThat(new IsPreemptionRequestFor(appC)));
+ }
+
+ @Test
+ public void testPerQueueDisablePreemptionRootDisablesAll() {
+ int[][] qData = new int[][] {
+ // / A D G
+ // B C E F H I
+ {1000, 500, 250, 250, 250, 100, 150, 250, 100, 150 }, // abs
+ {1000,1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap
+ {1000, 20, 0, 20, 490, 240, 250, 490, 240, 250 }, // used
+ { 200, 200, 200, 0, 0, 0, 0, 0, 0, 0 }, // pending
+ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
+ // appA appB appC appD appE appF
+ { 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
+ { -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granulrity
+ { 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
+ };
+
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+ + "root" + SUFFIX_DISABLE_PREEMPTION, true);
+ policy.editSchedule();
+ // All queues should be non-preemptable, so request should starve.
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueC
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); // queueE
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); // queueB
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); // queueH
+ verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); // queueI
+ }
+
+ @Test
public void testOverCapacityImbalance() {
int[][] qData = new int[][]{
// / A B C
@@ -341,7 +584,7 @@ public class TestProportionalCapacityPreemptionPolicy {
policy.editSchedule();
// verify capacity taken from A1, not B1 despite B1 being far over
// its absolute guaranteed capacity
- verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
+ verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
@@ -390,15 +633,17 @@ public class TestProportionalCapacityPreemptionPolicy {
@Test
public void testHierarchicalLarge() {
int[][] qData = new int[][] {
- // / A B C D E F G H I
- { 400, 200, 60, 140, 100, 70, 30, 100, 10, 90 }, // abs
- { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, }, // maxCap
- { 400, 210, 70,140, 100, 50, 50, 90, 90, 0 }, // used
- { 10, 0, 0, 0, 0, 0, 0, 0, 0, 15 }, // pending
- { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
- { 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
- { -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity
- { 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
+ // / A D G
+ // B C E F H I
+ { 400, 200, 60, 140, 100, 70, 30, 100, 10, 90 }, // abs
+ { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400 }, // maxCap
+ { 400, 210, 70, 140, 100, 50, 50, 90, 90, 0 }, // used
+ { 15, 0, 0, 0, 0, 0, 0, 0, 0, 15 }, // pending
+ { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
+ // appA appB appC appD appE appF
+ { 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
+ { -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity
+ { 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
@@ -407,8 +652,8 @@ public class TestProportionalCapacityPreemptionPolicy {
// XXX note: compensating for rounding error in Resources.multiplyTo
// which is likely triggered since we use small numbers for readability
- verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
- verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appE)));
+ verify(mDisp, times(7)).handle(argThat(new IsPreemptionRequestFor(appA)));
+ verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appE)));
}
@Test
@@ -629,6 +874,7 @@ public class TestProportionalCapacityPreemptionPolicy {
when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
+ when(root.getQueuePath()).thenReturn("root");
for (int i = 1; i < queues.length; ++i) {
final CSQueue q;
@@ -644,6 +890,10 @@ public class TestProportionalCapacityPreemptionPolicy {
when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
+ String parentPathName = p.getQueuePath();
+ parentPathName = (parentPathName == null) ? "root" : parentPathName;
+ String queuePathName = (parentPathName+"."+queueName).replace("/","root");
+ when(q.getQueuePath()).thenReturn(queuePathName);
}
assert 0 == pqs.size();
return root;
@@ -666,6 +916,8 @@ public class TestProportionalCapacityPreemptionPolicy {
LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
LeafQueue lq = mock(LeafQueue.class);
+ List<ApplicationAttemptId> appAttemptIdList =
+ new ArrayList<ApplicationAttemptId>();
when(lq.getTotalResourcePending()).thenReturn(
Resource.newInstance(pending[i], 0));
// consider moving where CapacityScheduler::comparator accessible
@@ -683,9 +935,14 @@ public class TestProportionalCapacityPreemptionPolicy {
int aPending = pending[i] / apps[i];
int aReserve = reserved[i] / apps[i];
for (int a = 0; a < apps[i]; ++a) {
- qApps.add(mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]));
+ FiCaSchedulerApp mockFiCaApp =
+ mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]);
+ qApps.add(mockFiCaApp);
++appAlloc;
+ appAttemptIdList.add(mockFiCaApp.getApplicationAttemptId());
}
+ when(mCS.getAppsInQueue("queue" + (char)('A' + i - 1)))
+ .thenReturn(appAttemptIdList);
}
when(lq.getApplications()).thenReturn(qApps);
if(setAMResourcePercent != 0.0f){