You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ro...@apache.org on 2016/01/11 07:38:59 UTC
hadoop git commit: YARN-3849. Too much of preemption activity causing
continuos killing of containers across queues. (Sunil G via wangda)
Repository: hadoop
Updated Branches:
refs/heads/branch-2.6 b91715bc8 -> 58a6142c1
YARN-3849. Too much of preemption activity causing continuos killing of containers across queues. (Sunil G via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/58a6142c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/58a6142c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/58a6142c
Branch: refs/heads/branch-2.6
Commit: 58a6142c1455cb622ffc7d520df30d8f6dc5680b
Parents: b91715b
Author: Rohith Sharma K S <ro...@apache.org>
Authored: Mon Jan 11 12:02:38 2016 +0530
Committer: Rohith Sharma K S <ro...@apache.org>
Committed: Mon Jan 11 12:02:38 2016 +0530
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 2 +
.../ProportionalCapacityPreemptionPolicy.java | 3 +-
...estProportionalCapacityPreemptionPolicy.java | 198 +++++++++++++++----
3 files changed, 165 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58a6142c/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index e46f2f7..8f35efc 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -43,6 +43,8 @@ Release 2.6.4 - UNRELEASED
YARN-4180. AMLauncher does not retry on failures when talking to NM.
(adhoot)
+ YARN-3849. Too much of preemption activity causing continuos killing of
+ containers across queues. (Sunil G via wangda)
Release 2.6.3 - 2015-12-17
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58a6142c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 0f48b0c..854d2c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -632,11 +632,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
TempQueue ret;
synchronized (root) {
String queueName = root.getQueueName();
- float absUsed = root.getAbsoluteUsedCapacity();
float absCap = root.getAbsoluteCapacity();
float absMaxCap = root.getAbsoluteMaximumCapacity();
- Resource current = Resources.multiply(clusterResources, absUsed);
+ Resource current = root.getQueueResourceUsage().getUsed();
Resource guaranteed = Resources.multiply(clusterResources, absCap);
Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
if (root instanceof LeafQueue) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/58a6142c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 24e70bb..bca2def 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
@@ -67,7 +68,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQu
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -382,7 +385,7 @@ public class TestProportionalCapacityPreemptionPolicy {
// we verify both that C has priority on B and D (has it has >0 guarantees)
// and that B and D are force to share their over capacity fairly (as they
// are both zero-guarantees) hence D sees some of its containers preempted
- verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC)));
+ verify(mDisp, times(15)).handle(argThat(new IsPreemptionRequestFor(appC)));
}
@@ -407,8 +410,8 @@ public class TestProportionalCapacityPreemptionPolicy {
// XXX note: compensating for rounding error in Resources.multiplyTo
// which is likely triggered since we use small numbers for readability
- verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
- verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appE)));
+ verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
+ verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appE)));
}
@Test
@@ -571,7 +574,35 @@ public class TestProportionalCapacityPreemptionPolicy {
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
setAMContainer = false;
}
-
+
+ @Test
+ public void testPreemptionWithVCoreResource() {
+ int[][] qData = new int[][] {
+ // / A B
+ { 100, 100, 100 }, // maxcap
+ { 5, 1, 1 }, // apps
+ { 2, 0, 0 }, // subqueues
+ };
+
+ // Resources can be set like memory:vcores
+ String[][] resData = new String[][] {
+ // / A B
+ { "100:100", "50:50", "50:50" },// abs
+ { "10:100", "10:100", "0" }, // used
+ { "70:20", "70:20", "10:100" }, // pending
+ { "0", "0", "0" }, // reserved
+ { "-1", "1:10", "1:10" }, // req granularity
+ };
+
+ // Passing last param as TRUE to use DominantResourceCalculator
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData, resData,
+ true);
+ policy.editSchedule();
+
+ // 5 containers will be preempted here
+ verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA)));
+ }
+
static class IsPreemptionRequestFor
extends ArgumentMatcher<ContainerPreemptEvent> {
private final ApplicationAttemptId appAttId;
@@ -598,37 +629,103 @@ public class TestProportionalCapacityPreemptionPolicy {
ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
ProportionalCapacityPreemptionPolicy policy =
new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock);
+ Resource clusterResources =
+ Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
ParentQueue mRoot = buildMockRootQueue(rand, qData);
when(mCS.getRootQueue()).thenReturn(mRoot);
- Resource clusterResources =
- Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
when(mCS.getClusterResource()).thenReturn(clusterResources);
return policy;
}
+ ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
+ String[][] resData) {
+ return buildPolicy(qData, resData, false);
+ }
+
+ ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
+ String[][] resData, boolean useDominantResourceCalculator) {
+ if (useDominantResourceCalculator) {
+ when(mCS.getResourceCalculator()).thenReturn(
+ new DominantResourceCalculator());
+ }
+ ProportionalCapacityPreemptionPolicy policy =
+ new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock);
+ Resource clusterResources = leafAbsCapacities(
+ parseResourceDetails(resData[0]), qData[2]);
+ when(mCS.getClusterResource()).thenReturn(clusterResources);
+ ParentQueue mRoot = buildMockRootQueue(rand, resData, qData);
+ when(mCS.getRootQueue()).thenReturn(mRoot);
+
+ return policy;
+ }
+
ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
- int[] abs = queueData[0];
- int[] maxCap = queueData[1];
- int[] used = queueData[2];
- int[] pending = queueData[3];
- int[] reserved = queueData[4];
- int[] apps = queueData[5];
- int[] gran = queueData[6];
- int[] queues = queueData[7];
+ Resource[] abs = generateResourceList(queueData[0]);
+ Resource[] used = generateResourceList(queueData[2]);
+ Resource[] pending = generateResourceList(queueData[3]);
+ Resource[] reserved = generateResourceList(queueData[4]);
+ Resource[] gran = generateResourceList(queueData[6]);
+ int[] maxCap = queueData[1];
+ int[] apps = queueData[5];
+ int[] queues = queueData[7];
+
+ return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
+ }
+
+ private ParentQueue buildMockRootQueue(Random rand2, String[][] resData,
+ int[][] queueData) {
+ Resource[] abs = parseResourceDetails(resData[0]);
+ Resource[] used = parseResourceDetails(resData[1]);
+ Resource[] pending = parseResourceDetails(resData[2]);
+ Resource[] reserved = parseResourceDetails(resData[3]);
+ Resource[] gran = parseResourceDetails(resData[4]);
+ int[] maxCap = queueData[0];
+ int[] apps = queueData[1];
+ int[] queues = queueData[2];
+
+ return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
+ }
+
+ Resource[] parseResourceDetails(String[] resData) {
+ List<Resource> resourceList = new ArrayList<Resource>();
+ for (int i = 0; i < resData.length; i++) {
+ String[] resource = resData[i].split(":");
+ if (resource.length == 1) {
+ resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]), 0));
+ } else {
+ resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]),
+ Integer.valueOf(resource[1])));
+ }
+ }
+ return resourceList.toArray(new Resource[resourceList.size()]);
+ }
- return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
+ Resource[] generateResourceList(int[] qData) {
+ List<Resource> resourceList = new ArrayList<Resource>();
+ for (int i = 0; i < qData.length; i++) {
+ resourceList.add(Resource.newInstance(qData[i], 0));
+ }
+ return resourceList.toArray(new Resource[resourceList.size()]);
}
- ParentQueue mockNested(int[] abs, int[] maxCap, int[] used,
- int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) {
- float tot = leafAbsCapacities(abs, queues);
+ ParentQueue mockNested(Resource[] abs, int[] maxCap, Resource[] used,
+ Resource[] pending, Resource[] reserved, int[] apps, Resource[] gran,
+ int[] queues) {
+ ResourceCalculator rc = mCS.getResourceCalculator();
+ Resource tot = leafAbsCapacities(abs, queues);
Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
ParentQueue root = mockParentQueue(null, queues[0], pqs);
+ ResourceUsage resUsage = new ResourceUsage();
+ resUsage.setUsed(used[0]);
when(root.getQueueName()).thenReturn("/");
- when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
- when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
- when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
+ when(root.getAbsoluteUsedCapacity()).thenReturn(
+ Resources.divide(rc, tot, used[0], tot));
+ when(root.getAbsoluteCapacity()).thenReturn(
+ Resources.divide(rc, tot, abs[0], tot));
+ when(root.getAbsoluteMaximumCapacity()).thenReturn(
+ maxCap[0] / (float) tot.getMemory());
+ when(root.getQueueResourceUsage()).thenReturn(resUsage);
for (int i = 1; i < queues.length; ++i) {
final CSQueue q;
@@ -636,14 +733,20 @@ public class TestProportionalCapacityPreemptionPolicy {
final String queueName = "queue" + ((char)('A' + i - 1));
if (queues[i] > 0) {
q = mockParentQueue(p, queues[i], pqs);
+ ResourceUsage resUsagePerQueue = new ResourceUsage();
+ resUsagePerQueue.setUsed(used[i]);
+ when(q.getQueueResourceUsage()).thenReturn(resUsagePerQueue);
} else {
q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran);
}
when(q.getParent()).thenReturn(p);
when(q.getQueueName()).thenReturn(queueName);
- when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
- when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
- when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
+ when(q.getAbsoluteUsedCapacity()).thenReturn(
+ Resources.divide(rc, tot, used[i], tot));
+ when(q.getAbsoluteCapacity()).thenReturn(
+ Resources.divide(rc, tot, abs[i], tot));
+ when(q.getAbsoluteMaximumCapacity()).thenReturn(
+ maxCap[i] / (float) tot.getMemory());
}
assert 0 == pqs.size();
return root;
@@ -663,11 +766,17 @@ public class TestProportionalCapacityPreemptionPolicy {
return pq;
}
- LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
- int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
+ LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs,
+ Resource[] used, Resource[] pending, Resource[] reserved, int[] apps,
+ Resource[] gran) {
LeafQueue lq = mock(LeafQueue.class);
- when(lq.getTotalResourcePending()).thenReturn(
- Resource.newInstance(pending[i], 0));
+ ResourceCalculator rc = mCS.getResourceCalculator();
+ when(lq.getTotalResourcePending()).thenReturn(pending[i]);
+ // need to set pending resource in resource usage as well
+ ResourceUsage ru = new ResourceUsage();
+ ru.setPending(pending[i]);
+ ru.setUsed(used[i]);
+ when(lq.getQueueResourceUsage()).thenReturn(ru);
// consider moving where CapacityScheduler::comparator accessible
NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
new Comparator<FiCaSchedulerApp>() {
@@ -679,9 +788,9 @@ public class TestProportionalCapacityPreemptionPolicy {
});
// applications are added in global L->R order in queues
if (apps[i] != 0) {
- int aUsed = used[i] / apps[i];
- int aPending = pending[i] / apps[i];
- int aReserve = reserved[i] / apps[i];
+ Resource aUsed = Resources.divideAndCeil(rc, used[i], apps[i]);
+ Resource aPending = Resources.divideAndCeil(rc, pending[i], apps[i]);
+ Resource aReserve = Resources.divideAndCeil(rc, reserved[i], apps[i]);
for (int a = 0; a < apps[i]; ++a) {
qApps.add(mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]));
++appAlloc;
@@ -695,9 +804,10 @@ public class TestProportionalCapacityPreemptionPolicy {
return lq;
}
- FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved,
- int gran) {
+ FiCaSchedulerApp mockApp(int qid, int id, Resource used, Resource pending,
+ Resource reserved, Resource gran) {
FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
+ ResourceCalculator rc = mCS.getResourceCalculator();
ApplicationId appId = ApplicationId.newInstance(TS, id);
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0);
@@ -705,22 +815,28 @@ public class TestProportionalCapacityPreemptionPolicy {
when(app.getApplicationAttemptId()).thenReturn(appAttId);
int cAlloc = 0;
- Resource unit = Resource.newInstance(gran, 0);
+ Resource unit = gran;
List<RMContainer> cReserved = new ArrayList<RMContainer>();
- for (int i = 0; i < reserved; i += gran) {
+ Resource resIter = Resource.newInstance(0, 0);
+ for (; Resources.lessThan(rc, mCS.getClusterResource(), resIter, reserved); Resources
+ .addTo(resIter, gran)) {
cReserved.add(mockContainer(appAttId, cAlloc, unit, 1));
++cAlloc;
}
when(app.getReservedContainers()).thenReturn(cReserved);
List<RMContainer> cLive = new ArrayList<RMContainer>();
- for (int i = 0; i < used; i += gran) {
+ Resource usedIter = Resource.newInstance(0, 0);
+ int i = 0;
+ for (; Resources.lessThan(rc, mCS.getClusterResource(), usedIter, used); Resources
+ .addTo(usedIter, gran)) {
if(setAMContainer && i == 0){
cLive.add(mockContainer(appAttId, cAlloc, unit, 0));
}else{
cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
}
++cAlloc;
+ ++i;
}
when(app.getLiveContainers()).thenReturn(cLive);
return app;
@@ -752,6 +868,16 @@ public class TestProportionalCapacityPreemptionPolicy {
return ret;
}
+ static Resource leafAbsCapacities(Resource[] abs, int[] subqueues) {
+ Resource ret = Resource.newInstance(0, 0);
+ for (int i = 0; i < abs.length; ++i) {
+ if (0 == subqueues[i]) {
+ Resources.addTo(ret, abs[i]);
+ }
+ }
+ return ret;
+ }
+
void printString(CSQueue nq, String indent) {
if (nq instanceof ParentQueue) {
System.out.println(indent + nq.getQueueName()