You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/07/13 20:59:54 UTC
[42/48] hadoop git commit: YARN-3849. Too much of preemption activity
causing continuos killing of containers across queues. (Sunil G via wangda)
YARN-3849. Too much of preemption activity causing continuos killing of containers across queues. (Sunil G via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a7e14587
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a7e14587
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a7e14587
Branch: refs/heads/YARN-2928
Commit: a7e145873b0b9f80cd23b6d1cf0173173ed4c4d3
Parents: 18e891b
Author: Wangda Tan <wa...@apache.org>
Authored: Sat Jul 11 10:26:46 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon Jul 13 11:51:16 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../ProportionalCapacityPreemptionPolicy.java | 4 +-
...estProportionalCapacityPreemptionPolicy.java | 253 ++++++++++++++-----
...pacityPreemptionPolicyForNodePartitions.java | 114 +++++++--
4 files changed, 289 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7e14587/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index ae5e9d6..db24ad2 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -725,6 +725,9 @@ Release 2.8.0 - UNRELEASED
YARN-3888. ApplicationMaster link is broken in RM WebUI when appstate is NEW.
(Bibin A Chundatt via xgong)
+ YARN-3849. Too much of preemption activity causing continuos killing of
+ containers across queues. (Sunil G via wangda)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7e14587/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 5a20a6f..6e661d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -840,12 +840,12 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
synchronized (curQueue) {
String queueName = curQueue.getQueueName();
QueueCapacities qc = curQueue.getQueueCapacities();
- float absUsed = qc.getAbsoluteUsedCapacity(partitionToLookAt);
float absCap = qc.getAbsoluteCapacity(partitionToLookAt);
float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt);
boolean preemptionDisabled = curQueue.getPreemptionDisabled();
- Resource current = Resources.multiply(partitionResource, absUsed);
+ Resource current = curQueue.getQueueResourceUsage().getUsed(
+ partitionToLookAt);
Resource guaranteed = Resources.multiply(partitionResource, absCap);
Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7e14587/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 2c0c6d7..3057360 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -81,7 +81,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -372,7 +374,7 @@ public class TestProportionalCapacityPreemptionPolicy {
appA.getApplicationId(), appA.getAttemptId());
assertTrue("appA should be running on queueB",
mCS.getAppsInQueue("queueB").contains(expectedAttemptOnQueueB));
- verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
+ verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
// Need to call setup() again to reset mDisp
setup();
@@ -395,7 +397,7 @@ public class TestProportionalCapacityPreemptionPolicy {
// Resources should have come from queueE (appC) and neither of queueA's
// children.
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
- verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC)));
+ verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
}
@Test
@@ -470,8 +472,8 @@ public class TestProportionalCapacityPreemptionPolicy {
// With all queues preemptable, resources should be taken from queueC(appA)
// and queueD(appB). Resources taken more from queueD(appB) than
// queueC(appA) because it's over its capacity by a larger percentage.
- verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
- verify(mDisp, times(182)).handle(argThat(new IsPreemptionRequestFor(appB)));
+ verify(mDisp, times(17)).handle(argThat(new IsPreemptionRequestFor(appA)));
+ verify(mDisp, times(183)).handle(argThat(new IsPreemptionRequestFor(appB)));
// Turn off preemption for queueA and it's children. queueF(appC)'s request
// should starve.
@@ -635,7 +637,7 @@ public class TestProportionalCapacityPreemptionPolicy {
policy.editSchedule();
// verify capacity taken from A1, not B1 despite B1 being far over
// its absolute guaranteed capacity
- verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
+ verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
@@ -676,7 +678,7 @@ public class TestProportionalCapacityPreemptionPolicy {
// we verify both that C has priority on B and D (has it has >0 guarantees)
// and that B and D are force to share their over capacity fairly (as they
// are both zero-guarantees) hence D sees some of its containers preempted
- verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC)));
+ verify(mDisp, times(15)).handle(argThat(new IsPreemptionRequestFor(appC)));
}
@@ -703,8 +705,8 @@ public class TestProportionalCapacityPreemptionPolicy {
// XXX note: compensating for rounding error in Resources.multiplyTo
// which is likely triggered since we use small numbers for readability
- verify(mDisp, times(7)).handle(argThat(new IsPreemptionRequestFor(appA)));
- verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appE)));
+ verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
+ verify(mDisp, times(6)).handle(argThat(new IsPreemptionRequestFor(appE)));
}
@Test
@@ -868,6 +870,34 @@ public class TestProportionalCapacityPreemptionPolicy {
setAMContainer = false;
}
+ @Test
+ public void testPreemptionWithVCoreResource() {
+ int[][] qData = new int[][]{
+ // / A B
+ {100, 100, 100}, // maxcap
+ {5, 1, 1}, // apps
+ {2, 0, 0}, // subqueues
+ };
+
+ // Resources can be set like memory:vcores
+ String[][] resData = new String[][]{
+ // / A B
+ {"100:100", "50:50", "50:50"}, // abs
+ {"10:100", "10:100", "0"}, // used
+ {"70:20", "70:20", "10:100"}, // pending
+ {"0", "0", "0"}, // reserved
+ {"-1", "1:10", "1:10"}, // req granularity
+ };
+
+ // Passing last param as TRUE to use DominantResourceCalculator
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData, resData,
+ true);
+ policy.editSchedule();
+
+ // 5 containers will be preempted here
+ verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA)));
+ }
+
static class IsPreemptionRequestFor
extends ArgumentMatcher<ContainerPreemptEvent> {
private final ApplicationAttemptId appAttId;
@@ -892,13 +922,40 @@ public class TestProportionalCapacityPreemptionPolicy {
}
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
- ProportionalCapacityPreemptionPolicy policy =
- new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock);
+ ProportionalCapacityPreemptionPolicy policy = new ProportionalCapacityPreemptionPolicy(
+ conf, rmContext, mCS, mClock);
+ clusterResources = Resource.newInstance(
+ leafAbsCapacities(qData[0], qData[7]), 0);
ParentQueue mRoot = buildMockRootQueue(rand, qData);
when(mCS.getRootQueue()).thenReturn(mRoot);
- clusterResources =
- Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
+ setResourceAndNodeDetails();
+ return policy;
+ }
+
+ ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
+ String[][] resData) {
+ return buildPolicy(qData, resData, false);
+ }
+
+ ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
+ String[][] resData, boolean useDominantResourceCalculator) {
+ if (useDominantResourceCalculator) {
+ when(mCS.getResourceCalculator()).thenReturn(
+ new DominantResourceCalculator());
+ }
+ ProportionalCapacityPreemptionPolicy policy =
+ new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock);
+ clusterResources = leafAbsCapacities(parseResourceDetails(resData[0]),
+ qData[2]);
+ ParentQueue mRoot = buildMockRootQueue(rand, resData, qData);
+ when(mCS.getRootQueue()).thenReturn(mRoot);
+
+ setResourceAndNodeDetails();
+ return policy;
+ }
+
+ private void setResourceAndNodeDetails() {
when(mCS.getClusterResource()).thenReturn(clusterResources);
when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
clusterResources);
@@ -906,35 +963,78 @@ public class TestProportionalCapacityPreemptionPolicy {
SchedulerNode mNode = mock(SchedulerNode.class);
when(mNode.getPartition()).thenReturn(RMNodeLabelsManager.NO_LABEL);
when(mCS.getSchedulerNode(any(NodeId.class))).thenReturn(mNode);
- return policy;
}
ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
- int[] abs = queueData[0];
- int[] maxCap = queueData[1];
- int[] used = queueData[2];
- int[] pending = queueData[3];
- int[] reserved = queueData[4];
- int[] apps = queueData[5];
- int[] gran = queueData[6];
- int[] queues = queueData[7];
-
- return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
+ Resource[] abs = generateResourceList(queueData[0]);
+ Resource[] used = generateResourceList(queueData[2]);
+ Resource[] pending = generateResourceList(queueData[3]);
+ Resource[] reserved = generateResourceList(queueData[4]);
+ Resource[] gran = generateResourceList(queueData[6]);
+ int[] maxCap = queueData[1];
+ int[] apps = queueData[5];
+ int[] queues = queueData[7];
+
+ return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
}
- ParentQueue mockNested(int[] abs, int[] maxCap, int[] used,
- int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) {
- float tot = leafAbsCapacities(abs, queues);
+ ParentQueue buildMockRootQueue(Random r, String[][] resData,
+ int[]... queueData) {
+ Resource[] abs = parseResourceDetails(resData[0]);
+ Resource[] used = parseResourceDetails(resData[1]);
+ Resource[] pending = parseResourceDetails(resData[2]);
+ Resource[] reserved = parseResourceDetails(resData[3]);
+ Resource[] gran = parseResourceDetails(resData[4]);
+ int[] maxCap = queueData[0];
+ int[] apps = queueData[1];
+ int[] queues = queueData[2];
+
+ return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
+ }
+
+ Resource[] parseResourceDetails(String[] resData) {
+ List<Resource> resourceList = new ArrayList<Resource>();
+ for (int i = 0; i < resData.length; i++) {
+ String[] resource = resData[i].split(":");
+ if (resource.length == 1) {
+ resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]), 0));
+ } else {
+ resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]),
+ Integer.valueOf(resource[1])));
+ }
+ }
+ return resourceList.toArray(new Resource[resourceList.size()]);
+ }
+
+ Resource[] generateResourceList(int[] qData) {
+ List<Resource> resourceList = new ArrayList<Resource>();
+ for (int i = 0; i < qData.length; i++) {
+ resourceList.add(Resource.newInstance(qData[i], 0));
+ }
+ return resourceList.toArray(new Resource[resourceList.size()]);
+ }
+
+ ParentQueue mockNested(Resource[] abs, int[] maxCap, Resource[] used,
+ Resource[] pending, Resource[] reserved, int[] apps, Resource[] gran,
+ int[] queues) {
+ ResourceCalculator rc = mCS.getResourceCalculator();
+ Resource tot = leafAbsCapacities(abs, queues);
Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
ParentQueue root = mockParentQueue(null, queues[0], pqs);
+ ResourceUsage resUsage = new ResourceUsage();
+ resUsage.setUsed(used[0]);
when(root.getQueueName()).thenReturn(CapacitySchedulerConfiguration.ROOT);
- when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
- when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
- when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
+ when(root.getAbsoluteUsedCapacity()).thenReturn(
+ Resources.divide(rc, tot, used[0], tot));
+ when(root.getAbsoluteCapacity()).thenReturn(
+ Resources.divide(rc, tot, abs[0], tot));
+ when(root.getAbsoluteMaximumCapacity()).thenReturn(
+ maxCap[0] / (float) tot.getMemory());
+ when(root.getQueueResourceUsage()).thenReturn(resUsage);
QueueCapacities rootQc = new QueueCapacities(true);
- rootQc.setAbsoluteUsedCapacity(used[0] / tot);
- rootQc.setAbsoluteCapacity(abs[0] / tot);
- rootQc.setAbsoluteMaximumCapacity(maxCap[0] / tot);
+ rootQc.setAbsoluteUsedCapacity(Resources.divide(rc, tot, used[0], tot));
+ rootQc.setAbsoluteCapacity(Resources.divide(rc, tot, abs[0], tot));
+ rootQc.setAbsoluteMaximumCapacity(maxCap[0] / (float) tot.getMemory());
when(root.getQueueCapacities()).thenReturn(rootQc);
when(root.getQueuePath()).thenReturn(CapacitySchedulerConfiguration.ROOT);
boolean preemptionDisabled = mockPreemptionStatus("root");
@@ -943,28 +1043,35 @@ public class TestProportionalCapacityPreemptionPolicy {
for (int i = 1; i < queues.length; ++i) {
final CSQueue q;
final ParentQueue p = pqs.removeLast();
- final String queueName = "queue" + ((char)('A' + i - 1));
+ final String queueName = "queue" + ((char) ('A' + i - 1));
if (queues[i] > 0) {
q = mockParentQueue(p, queues[i], pqs);
+ ResourceUsage resUsagePerQueue = new ResourceUsage();
+ resUsagePerQueue.setUsed(used[i]);
+ when(q.getQueueResourceUsage()).thenReturn(resUsagePerQueue);
} else {
q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran);
}
when(q.getParent()).thenReturn(p);
when(q.getQueueName()).thenReturn(queueName);
- when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
- when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
- when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
+ when(q.getAbsoluteUsedCapacity()).thenReturn(
+ Resources.divide(rc, tot, used[i], tot));
+ when(q.getAbsoluteCapacity()).thenReturn(
+ Resources.divide(rc, tot, abs[i], tot));
+ when(q.getAbsoluteMaximumCapacity()).thenReturn(
+ maxCap[i] / (float) tot.getMemory());
// We need to make these fields to QueueCapacities
QueueCapacities qc = new QueueCapacities(false);
- qc.setAbsoluteUsedCapacity(used[i] / tot);
- qc.setAbsoluteCapacity(abs[i] / tot);
- qc.setAbsoluteMaximumCapacity(maxCap[i] / tot);
+ qc.setAbsoluteUsedCapacity(Resources.divide(rc, tot, used[i], tot));
+ qc.setAbsoluteCapacity(Resources.divide(rc, tot, abs[i], tot));
+ qc.setAbsoluteMaximumCapacity(maxCap[i] / (float) tot.getMemory());
when(q.getQueueCapacities()).thenReturn(qc);
String parentPathName = p.getQueuePath();
parentPathName = (parentPathName == null) ? "root" : parentPathName;
- String queuePathName = (parentPathName+"."+queueName).replace("/","root");
+ String queuePathName = (parentPathName + "." + queueName).replace("/",
+ "root");
when(q.getQueuePath()).thenReturn(queuePathName);
preemptionDisabled = mockPreemptionStatus(queuePathName);
when(q.getPreemptionDisabled()).thenReturn(preemptionDisabled);
@@ -1004,16 +1111,18 @@ public class TestProportionalCapacityPreemptionPolicy {
}
@SuppressWarnings("rawtypes")
- LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
- int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
+ LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs,
+ Resource[] used, Resource[] pending, Resource[] reserved, int[] apps,
+ Resource[] gran) {
LeafQueue lq = mock(LeafQueue.class);
+ ResourceCalculator rc = mCS.getResourceCalculator();
List<ApplicationAttemptId> appAttemptIdList =
new ArrayList<ApplicationAttemptId>();
- when(lq.getTotalResourcePending()).thenReturn(
- Resource.newInstance(pending[i], 0));
+ when(lq.getTotalResourcePending()).thenReturn(pending[i]);
// need to set pending resource in resource usage as well
ResourceUsage ru = new ResourceUsage();
- ru.setPending(Resource.newInstance(pending[i], 0));
+ ru.setPending(pending[i]);
+ ru.setUsed(used[i]);
when(lq.getQueueResourceUsage()).thenReturn(ru);
// consider moving where CapacityScheduler::comparator accessible
final NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
@@ -1026,9 +1135,9 @@ public class TestProportionalCapacityPreemptionPolicy {
});
// applications are added in global L->R order in queues
if (apps[i] != 0) {
- int aUsed = used[i] / apps[i];
- int aPending = pending[i] / apps[i];
- int aReserve = reserved[i] / apps[i];
+ Resource aUsed = Resources.divideAndCeil(rc, used[i], apps[i]);
+ Resource aPending = Resources.divideAndCeil(rc, pending[i], apps[i]);
+ Resource aReserve = Resources.divideAndCeil(rc, reserved[i], apps[i]);
for (int a = 0; a < apps[i]; ++a) {
FiCaSchedulerApp mockFiCaApp =
mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]);
@@ -1055,9 +1164,10 @@ public class TestProportionalCapacityPreemptionPolicy {
return lq;
}
- FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved,
- int gran) {
+ FiCaSchedulerApp mockApp(int qid, int id, Resource used, Resource pending,
+ Resource reserved, Resource gran) {
FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
+ ResourceCalculator rc = mCS.getResourceCalculator();
ApplicationId appId = ApplicationId.newInstance(TS, id);
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0);
@@ -1065,30 +1175,35 @@ public class TestProportionalCapacityPreemptionPolicy {
when(app.getApplicationAttemptId()).thenReturn(appAttId);
int cAlloc = 0;
- Resource unit = Resource.newInstance(gran, 0);
+ Resource unit = gran;
List<RMContainer> cReserved = new ArrayList<RMContainer>();
- for (int i = 0; i < reserved; i += gran) {
- cReserved.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER
- .getValue()));
+ Resource resIter = Resource.newInstance(0, 0);
+ for (; Resources.lessThan(rc, clusterResources, resIter, reserved); Resources
+ .addTo(resIter, gran)) {
+ cReserved.add(mockContainer(appAttId, cAlloc, unit,
+ priority.CONTAINER.getValue()));
++cAlloc;
}
when(app.getReservedContainers()).thenReturn(cReserved);
List<RMContainer> cLive = new ArrayList<RMContainer>();
- for (int i = 0; i < used; i += gran) {
- if(setAMContainer && i == 0){
- cLive.add(mockContainer(appAttId, cAlloc, unit, priority.AMCONTAINER
- .getValue()));
- }else if(setLabeledContainer && i ==1){
+ Resource usedIter = Resource.newInstance(0, 0);
+ int i = 0;
+ for (; Resources.lessThan(rc, clusterResources, usedIter, used); Resources
+ .addTo(usedIter, gran)) {
+ if (setAMContainer && i == 0) {
+ cLive.add(mockContainer(appAttId, cAlloc, unit,
+ priority.AMCONTAINER.getValue()));
+ } else if (setLabeledContainer && i == 1) {
cLive.add(mockContainer(appAttId, cAlloc, unit,
priority.LABELEDCONTAINER.getValue()));
- ++used;
- }
- else{
- cLive.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER
- .getValue()));
+ Resources.addTo(used, Resource.newInstance(1, 1));
+ } else {
+ cLive.add(mockContainer(appAttId, cAlloc, unit,
+ priority.CONTAINER.getValue()));
}
++cAlloc;
+ ++i;
}
when(app.getLiveContainers()).thenReturn(cLive);
return app;
@@ -1124,6 +1239,16 @@ public class TestProportionalCapacityPreemptionPolicy {
return ret;
}
+ static Resource leafAbsCapacities(Resource[] abs, int[] subqueues) {
+ Resource ret = Resource.newInstance(0, 0);
+ for (int i = 0; i < abs.length; ++i) {
+ if (0 == subqueues[i]) {
+ Resources.addTo(ret, abs[i]);
+ }
+ }
+ return ret;
+ }
+
void printString(CSQueue nq, String indent) {
if (nq instanceof ParentQueue) {
System.out.println(indent + nq.getQueueName()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7e14587/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
index 114769c..b3ac79b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
@@ -771,6 +772,60 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
}
+ @Test
+ public void testNodePartitionPreemptionWithVCoreResource() throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ *
+ * Both a/b can access x, and guaranteed capacity of them is 50:50. Two
+ * nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster,
+ * app1/app2 in a, and app3/app4 in b. app1 uses 80 x, app2 uses 20
+ * NO_LABEL, app3 uses 20 x, app4 uses 80 NO_LABEL. Both a/b have 50 pending
+ * resource for x and NO_LABEL
+ *
+ * After preemption, it should preempt 30 from app1, and 30 from app4.
+ */
+ String labelsConfig = "=100:200,true;" + // default partition
+ "x=100:200,true"; // partition=x
+ String nodesConfig = "n1=x;" + // n1 has partition=x
+ "n2="; // n2 is default partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100:200 100:200 100:200 100:200],x=[100:200 100:200 100:200 100:200]);"
+ + // root
+ "-a(=[50:100 100:200 20:40 50:100],x=[50:100 100:200 80:160 50:100]);" + // a
+ "-b(=[50:100 100:200 80:160 50:100],x=[50:100 100:200 20:40 50:100])"; // b
+ String appsConfig =
+ // queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a
+ + "(1,1:2,n1,x,80,false);" + // 80 * x in n1
+ "a\t" // app2 in a
+ + "(1,1:2,n2,,20,false);" + // 20 default in n2
+ "b\t" // app3 in b
+ + "(1,1:2,n1,x,20,false);" + // 20 * x in n1
+ "b\t" // app4 in b
+ + "(1,1:2,n2,,80,false)"; // 80 default in n2
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true);
+ policy.editSchedule();
+
+ // 30 preempted from app1, 30 preempted from app4, and nothing preempted
+ // from app2/app3
+ verify(mDisp, times(30)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+ verify(mDisp, times(30)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
+ verify(mDisp, never()).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+ verify(mDisp, never()).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
+ }
private ApplicationAttemptId getAppAttemptId(int id) {
ApplicationId appId = ApplicationId.newInstance(0L, id);
@@ -821,6 +876,16 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
private void buildEnv(String labelsConfig, String nodesConfig,
String queuesConfig, String appsConfig) throws IOException {
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, false);
+ }
+
+ private void buildEnv(String labelsConfig, String nodesConfig,
+ String queuesConfig, String appsConfig,
+ boolean useDominantResourceCalculator) throws IOException {
+ if (useDominantResourceCalculator) {
+ when(cs.getResourceCalculator()).thenReturn(
+ new DominantResourceCalculator());
+ }
mockNodeLabelsManager(labelsConfig);
mockSchedulerNodes(nodesConfig);
for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) {
@@ -832,7 +897,8 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
when(cs.getClusterResource()).thenReturn(clusterResource);
mockApplications(appsConfig);
- policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs, mClock);
+ policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs,
+ mClock);
}
private void mockContainers(String containersConfig, ApplicationAttemptId attemptId,
@@ -868,7 +934,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
+ "(priority,resource,host,expression,repeat,reserved)");
}
Priority pri = Priority.newInstance(Integer.valueOf(values[0]));
- Resource res = Resources.createResource(Integer.valueOf(values[1]));
+ Resource res = parseResourceFromString(values[1]);
NodeId host = NodeId.newInstance(values[2], 1);
String exp = values[3];
int repeat = Integer.valueOf(values[4]);
@@ -1002,11 +1068,10 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
clusterResource = Resources.createResource(0);
for (String p : partitionConfigArr) {
String partitionName = p.substring(0, p.indexOf("="));
- int totalResource =
- Integer.valueOf(p.substring(p.indexOf("=") + 1, p.indexOf(",")));
- boolean exclusivity =
+ Resource res = parseResourceFromString(p.substring(p.indexOf("=") + 1,
+ p.indexOf(",")));
+ boolean exclusivity =
Boolean.valueOf(p.substring(p.indexOf(",") + 1, p.length()));
- Resource res = Resources.createResource(totalResource);
when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class)))
.thenReturn(res);
when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity);
@@ -1022,6 +1087,18 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
partitionToResource.keySet());
}
+ private Resource parseResourceFromString(String p) {
+ String[] resource = p.split(":");
+ Resource res = Resources.createResource(0);
+ if (resource.length == 1) {
+ res = Resources.createResource(Integer.valueOf(resource[0]));
+ } else {
+ res = Resources.createResource(Integer.valueOf(resource[0]),
+ Integer.valueOf(resource[1]));
+ }
+ return res;
+ }
+
/**
* Format is:
* <pre>
@@ -1120,23 +1197,22 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
// Add a small epsilon to capacities to avoid truncate when doing
// Resources.multiply
float epsilon = 1e-6f;
- float absGuaranteed =
- Integer.valueOf(values[0].trim())
- / (float) (partitionToResource.get(partitionName).getMemory())
- + epsilon;
- float absMax =
- Integer.valueOf(values[1].trim())
- / (float) (partitionToResource.get(partitionName).getMemory())
- + epsilon;
- float absUsed =
- Integer.valueOf(values[2].trim())
- / (float) (partitionToResource.get(partitionName).getMemory())
- + epsilon;
- Resource pending = Resources.createResource(Integer.valueOf(values[3].trim()));
+ Resource totResoucePerPartition = partitionToResource.get(partitionName);
+ float absGuaranteed = Resources.divide(rc, totResoucePerPartition,
+ parseResourceFromString(values[0].trim()), totResoucePerPartition)
+ + epsilon;
+ float absMax = Resources.divide(rc, totResoucePerPartition,
+ parseResourceFromString(values[1].trim()), totResoucePerPartition)
+ + epsilon;
+ float absUsed = Resources.divide(rc, totResoucePerPartition,
+ parseResourceFromString(values[2].trim()), totResoucePerPartition)
+ + epsilon;
+ Resource pending = parseResourceFromString(values[3].trim());
qc.setAbsoluteCapacity(partitionName, absGuaranteed);
qc.setAbsoluteMaximumCapacity(partitionName, absMax);
qc.setAbsoluteUsedCapacity(partitionName, absUsed);
ru.setPending(partitionName, pending);
+ ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
+ " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
+ ",abs_used" + absUsed + ",pending_resource=" + pending + "]");