You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/01/20 01:55:41 UTC
hadoop git commit: YARN-2933. Capacity Scheduler preemption policy
should only consider capacity without labels temporarily. Contributed by
Mayank Bansal
Repository: hadoop
Updated Branches:
refs/heads/trunk cb0a15d20 -> 0a2d3e717
YARN-2933. Capacity Scheduler preemption policy should only consider capacity without labels temporarily. Contributed by Mayank Bansal
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0a2d3e71
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0a2d3e71
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0a2d3e71
Branch: refs/heads/trunk
Commit: 0a2d3e717d9c42090a32ff177991a222a1e34132
Parents: cb0a15d
Author: Wangda Tan <wa...@apache.org>
Authored: Mon Jan 19 16:48:50 2015 -0800
Committer: Wangda Tan <wa...@apache.org>
Committed: Mon Jan 19 16:48:50 2015 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../ProportionalCapacityPreemptionPolicy.java | 53 +++++++++-
...estProportionalCapacityPreemptionPolicy.java | 106 +++++++++++++++++--
3 files changed, 149 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a2d3e71/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 5670963..a29d316 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -391,6 +391,9 @@ Release 2.7.0 - UNRELEASED
YARN-3015. yarn classpath command should support same options as hadoop
classpath. (Contributed by Varun Saxena)
+ YARN-2933. Capacity Scheduler preemption policy should only consider capacity
+ without labels temporarily. (Mayank Bansal via wangda)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a2d3e71/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 1a3f804..0743f60 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -30,15 +30,19 @@ import java.util.NavigableSet;
import java.util.PriorityQueue;
import java.util.Set;
+import org.apache.commons.collections.map.HashedMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
@@ -129,6 +133,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
private float percentageClusterPreemptionAllowed;
private double naturalTerminationFactor;
private boolean observeOnly;
+ private Map<NodeId, Set<String>> labels;
public ProportionalCapacityPreemptionPolicy() {
clock = new SystemClock();
@@ -168,6 +173,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1);
observeOnly = config.getBoolean(OBSERVE_ONLY, false);
rc = scheduler.getResourceCalculator();
+ labels = null;
}
@VisibleForTesting
@@ -176,14 +182,39 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
}
@Override
- public void editSchedule(){
+ public void editSchedule() {
CSQueue root = scheduler.getRootQueue();
- Resource clusterResources =
- Resources.clone(scheduler.getClusterResource());
+ Resource clusterResources = Resources.clone(scheduler.getClusterResource());
+ clusterResources = getNonLabeledResources(clusterResources);
+ setNodeLabels(scheduler.getRMContext().getNodeLabelManager()
+ .getNodeLabels());
containerBasedPreemptOrKill(root, clusterResources);
}
/**
+ * Setting Node Labels
+ *
+ * @param nodelabels
+ */
+ public void setNodeLabels(Map<NodeId, Set<String>> nodelabels) {
+ labels = nodelabels;
+ }
+
+ /**
+ * This method returns all non labeled resources.
+ *
+ * @param clusterResources
+ * @return Resources
+ */
+ private Resource getNonLabeledResources(Resource clusterResources) {
+ RMContext rmcontext = scheduler.getRMContext();
+ RMNodeLabelsManager lm = rmcontext.getNodeLabelManager();
+ Resource res = lm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
+ clusterResources);
+ return res == null ? clusterResources : res;
+ }
+
+ /**
* This method selects and tracks containers to be preempted. If a container
* is in the target list for more than maxWaitTime it is killed.
*
@@ -593,7 +624,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
* @param app
* @param clusterResource
* @param rsrcPreempt
- * @return
+ * @return Set<RMContainer> Set of RMContainers
*/
private Set<RMContainer> preemptFrom(FiCaSchedulerApp app,
Resource clusterResource, Resource rsrcPreempt,
@@ -635,12 +666,26 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
Resources.addTo(skippedAMSize, c.getContainer().getResource());
continue;
}
+ // skip Labeled resource
+ if(isLabeledContainer(c)){
+ continue;
+ }
ret.add(c);
Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
}
return ret;
}
+
+ /**
+ * Checking if given container is a labeled container
+ *
+ * @param c
+ * @return true/false
+ */
+ private boolean isLabeledContainer(RMContainer c) {
+ return labels.containsKey(c.getAllocatedNode());
+ }
/**
* Compare by reversed priority order first, and then reversed containerId
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a2d3e71/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index ca67ef0..0a147f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -38,27 +38,37 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.NavigableSet;
import java.util.Random;
+import java.util.Set;
import java.util.TreeSet;
+import org.apache.commons.collections.map.HashedMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
@@ -72,12 +82,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
+import org.mortbay.log.Log;
public class TestProportionalCapacityPreemptionPolicy {
@@ -85,14 +97,18 @@ public class TestProportionalCapacityPreemptionPolicy {
int appAlloc = 0;
boolean setAMContainer = false;
+ boolean setLabeledContainer = false;
float setAMResourcePercent = 0.0f;
Random rand = null;
Clock mClock = null;
Configuration conf = null;
CapacityScheduler mCS = null;
+ RMContext rmContext = null;
+ RMNodeLabelsManager lm = null;
CapacitySchedulerConfiguration schedConf = null;
EventHandler<ContainerPreemptEvent> mDisp = null;
ResourceCalculator rc = new DefaultResourceCalculator();
+ Resource clusterResources = null;
final ApplicationAttemptId appA = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 0), 0);
final ApplicationAttemptId appB = ApplicationAttemptId.newInstance(
@@ -108,6 +124,19 @@ public class TestProportionalCapacityPreemptionPolicy {
final ArgumentCaptor<ContainerPreemptEvent> evtCaptor =
ArgumentCaptor.forClass(ContainerPreemptEvent.class);
+ public enum priority {
+ AMCONTAINER(0), CONTAINER(1), LABELEDCONTAINER(2);
+ int value;
+
+ private priority(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return this.value;
+ }
+ };
+
@Rule public TestName name = new TestName();
@Before
@@ -130,8 +159,12 @@ public class TestProportionalCapacityPreemptionPolicy {
mClock = mock(Clock.class);
mCS = mock(CapacityScheduler.class);
when(mCS.getResourceCalculator()).thenReturn(rc);
+ lm = mock(RMNodeLabelsManager.class);
schedConf = new CapacitySchedulerConfiguration();
when(mCS.getConfiguration()).thenReturn(schedConf);
+ rmContext = mock(RMContext.class);
+ when(mCS.getRMContext()).thenReturn(rmContext);
+ when(rmContext.getNodeLabelManager()).thenReturn(lm);
mDisp = mock(EventHandler.class);
rand = new Random();
long seed = rand.nextLong();
@@ -746,7 +779,51 @@ public class TestProportionalCapacityPreemptionPolicy {
verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
setAMContainer = false;
}
-
+
+ @Test
+ public void testIdealAllocationForLabels() {
+ int[][] qData = new int[][] {
+ // / A B
+ { 80, 40, 40 }, // abs
+ { 80, 80, 80 }, // maxcap
+ { 80, 80, 0 }, // used
+ { 70, 20, 50 }, // pending
+ { 0, 0, 0 }, // reserved
+ { 5, 4, 1 }, // apps
+ { -1, 1, 1 }, // req granularity
+ { 2, 0, 0 }, // subqueues
+ };
+ setAMContainer = true;
+ setLabeledContainer = true;
+ Map<NodeId, Set<String>> labels = new HashMap<NodeId, Set<String>>();
+ NodeId node = NodeId.newInstance("node1", 0);
+ Set<String> labelSet = new HashSet<String>();
+ labelSet.add("x");
+ labels.put(node, labelSet);
+ when(lm.getNodeLabels()).thenReturn(labels);
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ // Subtracting Label X resources from cluster resources
+ when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
+ Resources.clone(Resource.newInstance(80, 0)));
+ clusterResources.setMemory(100);
+ policy.editSchedule();
+
+ // By skipping AM Container and Labeled container, all other 18 containers
+ // of appD will be
+ // preempted
+ verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appD)));
+
+ // By skipping AM Container and Labeled container, all other 18 containers
+ // of appC will be
+ // preempted
+ verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appC)));
+
+ // rest 4 containers from appB will be preempted
+ verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
+ setAMContainer = false;
+ setLabeledContainer = false;
+ }
+
@Test
public void testPreemptSkippedAMContainers() {
int[][] qData = new int[][] {
@@ -846,7 +923,7 @@ public class TestProportionalCapacityPreemptionPolicy {
ParentQueue mRoot = buildMockRootQueue(rand, qData);
when(mCS.getRootQueue()).thenReturn(mRoot);
- Resource clusterResources =
+ clusterResources =
Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
when(mCS.getClusterResource()).thenReturn(clusterResources);
return policy;
@@ -965,7 +1042,8 @@ public class TestProportionalCapacityPreemptionPolicy {
Resource unit = Resource.newInstance(gran, 0);
List<RMContainer> cReserved = new ArrayList<RMContainer>();
for (int i = 0; i < reserved; i += gran) {
- cReserved.add(mockContainer(appAttId, cAlloc, unit, 1));
+ cReserved.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER
+ .getValue()));
++cAlloc;
}
when(app.getReservedContainers()).thenReturn(cReserved);
@@ -973,9 +1051,16 @@ public class TestProportionalCapacityPreemptionPolicy {
List<RMContainer> cLive = new ArrayList<RMContainer>();
for (int i = 0; i < used; i += gran) {
if(setAMContainer && i == 0){
- cLive.add(mockContainer(appAttId, cAlloc, unit, 0));
- }else{
- cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
+ cLive.add(mockContainer(appAttId, cAlloc, unit, priority.AMCONTAINER
+ .getValue()));
+ }else if(setLabeledContainer && i ==1){
+ cLive.add(mockContainer(appAttId, cAlloc, unit,
+ priority.LABELEDCONTAINER.getValue()));
+ ++used;
+ }
+ else{
+ cLive.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER
+ .getValue()));
}
++cAlloc;
}
@@ -984,18 +1069,21 @@ public class TestProportionalCapacityPreemptionPolicy {
}
RMContainer mockContainer(ApplicationAttemptId appAttId, int id,
- Resource r, int priority) {
+ Resource r, int cpriority) {
ContainerId cId = ContainerId.newContainerId(appAttId, id);
Container c = mock(Container.class);
when(c.getResource()).thenReturn(r);
- when(c.getPriority()).thenReturn(Priority.create(priority));
+ when(c.getPriority()).thenReturn(Priority.create(cpriority));
RMContainer mC = mock(RMContainer.class);
when(mC.getContainerId()).thenReturn(cId);
when(mC.getContainer()).thenReturn(c);
when(mC.getApplicationAttemptId()).thenReturn(appAttId);
- if(0 == priority){
+ if (priority.AMCONTAINER.getValue() == cpriority) {
when(mC.isAMContainer()).thenReturn(true);
}
+ if (priority.LABELEDCONTAINER.getValue() == cpriority) {
+ when(mC.getAllocatedNode()).thenReturn(NodeId.newInstance("node1", 0));
+ }
return mC;
}