You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cd...@apache.org on 2013/05/09 23:21:42 UTC
svn commit: r1480778 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-reso...
Author: cdouglas
Date: Thu May 9 21:21:42 2013
New Revision: 1480778
URL: http://svn.apache.org/r1480778
Log:
YARN-568. Add support for work preserving preemption to the FairScheduler.
Contributed by Carlo Curino and Sandy Ryza
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1480778&r1=1480777&r2=1480778&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu May 9 21:21:42 2013
@@ -10,9 +10,6 @@ Trunk - Unreleased
Azure environments. (See breakdown of tasks below for subtasks and
contributors)
- YARN-45. Add protocol for schedulers to request containers back from
- ApplicationMasters. (Carlo Curino, cdouglas)
-
IMPROVEMENTS
YARN-84. Use Builder to build RPC server. (Brandon Li via suresh)
@@ -129,6 +126,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-482. FS: Extend SchedulingMode to intermediate queues.
(kkambatl via tucu)
+ YARN-45. Add protocol for schedulers to request containers back from
+ ApplicationMasters. (Carlo Curino, cdouglas)
+
IMPROVEMENTS
YARN-365. Change NM heartbeat handling to not generate a scheduler event
@@ -235,6 +235,9 @@ Release 2.0.5-beta - UNRELEASED
tokens for app attempt so that RM can be restarted while preserving current
applications. (Jian He via vinodkv)
+ YARN-568. Add support for work preserving preemption to the FairScheduler.
+ (Carlo Curino and Sandy Ryza via cdouglas)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1480778&r1=1480777&r2=1480778&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Thu May 9 21:21:42 2013
@@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -39,6 +41,11 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContainer;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContract;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StrictPreemptionContract;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -341,9 +348,65 @@ public class ApplicationMasterService ex
}
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
+
+ // add preemption to the allocateResponse message (if any)
+ allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
+
return allocateResponse;
}
}
+
+ private PreemptionMessage generatePreemptionMessage(Allocation allocation){
+ PreemptionMessage pMsg = null;
+ // assemble strict preemption request
+ if (allocation.getStrictContainerPreemptions() != null) {
+ pMsg =
+ recordFactory.newRecordInstance(PreemptionMessage.class);
+ StrictPreemptionContract pStrict =
+ recordFactory.newRecordInstance(StrictPreemptionContract.class);
+ Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
+ for (ContainerId cId : allocation.getStrictContainerPreemptions()) {
+ PreemptionContainer pc =
+ recordFactory.newRecordInstance(PreemptionContainer.class);
+ pc.setId(cId);
+ pCont.add(pc);
+ }
+ pStrict.setContainers(pCont);
+ pMsg.setStrictContract(pStrict);
+ }
+
+ // assemble negotiable preemption request
+ if (allocation.getResourcePreemptions() != null &&
+ allocation.getResourcePreemptions().size() > 0 &&
+ allocation.getContainerPreemptions() != null &&
+ allocation.getContainerPreemptions().size() > 0) {
+ if (pMsg == null) {
+ pMsg =
+ recordFactory.newRecordInstance(PreemptionMessage.class);
+ }
+ PreemptionContract contract =
+ recordFactory.newRecordInstance(PreemptionContract.class);
+ Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
+ for (ContainerId cId : allocation.getContainerPreemptions()) {
+ PreemptionContainer pc =
+ recordFactory.newRecordInstance(PreemptionContainer.class);
+ pc.setId(cId);
+ pCont.add(pc);
+ }
+ List<PreemptionResourceRequest> pRes = new ArrayList<PreemptionResourceRequest>();
+ for (ResourceRequest crr : allocation.getResourcePreemptions()) {
+ PreemptionResourceRequest prr =
+ recordFactory.newRecordInstance(PreemptionResourceRequest.class);
+ prr.setResourceRequest(crr);
+ pRes.add(prr);
+ }
+ contract.setContainers(pCont);
+ contract.setResourceRequest(pRes);
+ pMsg.setContract(contract);
+ }
+
+ return pMsg;
+ }
public void registerAppAttempt(ApplicationAttemptId attemptId) {
AllocateResponse response =
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java?rev=1480778&r1=1480777&r2=1480778&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java Thu May 9 21:21:42 2013
@@ -19,17 +19,43 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.List;
+import java.util.Set;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
public class Allocation {
+
+ private final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
final List<Container> containers;
final Resource resourceLimit;
-
+ final Set<ContainerId> strictContainers;
+ final Set<ContainerId> fungibleContainers;
+ final List<ResourceRequest> fungibleResources;
+
public Allocation(List<Container> containers, Resource resourceLimit) {
+ this(containers, resourceLimit, null, null, null);
+ }
+
+ public Allocation(List<Container> containers, Resource resourceLimit,
+ Set<ContainerId> strictContainers) {
+ this(containers, resourceLimit, strictContainers, null, null);
+ }
+
+ public Allocation(List<Container> containers, Resource resourceLimit,
+ Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
+ List<ResourceRequest> fungibleResources) {
this.containers = containers;
this.resourceLimit = resourceLimit;
+ this.strictContainers = strictContainers;
+ this.fungibleContainers = fungibleContainers;
+ this.fungibleResources = fungibleResources;
}
public List<Container> getContainers() {
@@ -39,5 +65,17 @@ public class Allocation {
public Resource getResourceLimit() {
return resourceLimit;
}
-
+
+ public Set<ContainerId> getStrictContainerPreemptions() {
+ return strictContainers;
+ }
+
+ public Set<ContainerId> getContainerPreemptions() {
+ return fungibleContainers;
+ }
+
+ public List<ResourceRequest> getResourcePreemptions() {
+ return fungibleResources;
+ }
+
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1480778&r1=1480777&r2=1480778&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Thu May 9 21:21:42 2013
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -83,7 +84,9 @@ public class FSSchedulerApp extends Sche
final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
new HashMap<Priority, Map<NodeId, RMContainer>>();
-
+
+ final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
+
/**
* Count how many times the application has been given an opportunity
* to schedule a task at each priority. Each time the scheduler
@@ -230,6 +233,9 @@ public class FSSchedulerApp extends Sche
Resource containerResource = rmContainer.getContainer().getResource();
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
Resources.subtractFrom(currentConsumption, containerResource);
+
+ // remove from preemption map if it is completed
+ preemptionMap.remove(rmContainer);
}
synchronized public List<Container> pullNewlyAllocatedContainers() {
@@ -572,4 +578,18 @@ public class FSSchedulerApp extends Sche
" priority " + priority);
allowedLocalityLevel.put(priority, level);
}
+
+ // related methods
+ public void addPreemption(RMContainer container, long time) {
+ assert preemptionMap.get(container) == null;
+ preemptionMap.put(container, time);
+ }
+
+ public Long getContainerPreemptionTime(RMContainer container) {
+ return preemptionMap.get(container);
+ }
+
+ public Set<RMContainer> getPreemptionContainers() {
+ return preemptionMap.keySet();
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1480778&r1=1480777&r2=1480778&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu May 9 21:21:42 2013
@@ -24,8 +24,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
@@ -155,10 +158,16 @@ public class FairScheduler implements Re
private Resource clusterCapacity =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
- // How often tasks are preempted (must be longer than a couple
+ // How often tasks are preempted
+ protected long preemptionInterval;
+
+ // ms to wait before force killing stuff (must be longer than a couple
// of heartbeats to give task-kill commands a chance to act).
- protected long preemptionInterval = 15000;
-
+ protected long waitTimeBeforeKill;
+
+ // Containers whose AMs have been warned that they will be preempted soon.
+ private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
+
protected boolean preemptionEnabled;
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
@@ -335,34 +344,78 @@ public class FairScheduler implements Re
// Sort containers into reverse order of priority
Collections.sort(runningContainers, new Comparator<RMContainer>() {
public int compare(RMContainer c1, RMContainer c2) {
- return c2.getContainer().getPriority().compareTo(
+ int ret = c2.getContainer().getPriority().compareTo(
c1.getContainer().getPriority());
+ if (ret == 0) {
+ return c2.getContainerId().compareTo(c1.getContainerId());
+ }
+ return ret;
}
});
+
+ // Scan down the list of containers we've already warned and kill them
+ // if we need to. Remove any containers from the list that we don't need
+ // or that are no longer running.
+ Iterator<RMContainer> warnedIter = warnedContainers.iterator();
+ Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
+ while (warnedIter.hasNext()) {
+ RMContainer container = warnedIter.next();
+ if (container.getState() == RMContainerState.RUNNING &&
+ Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+ toPreempt, Resources.none())) {
+ warnOrKillContainer(container, apps.get(container), queues.get(container));
+ preemptedThisRound.add(container);
+ Resources.subtractFrom(toPreempt, container.getContainer().getResource());
+ } else {
+ warnedIter.remove();
+ }
+ }
- // Scan down the sorted list of task statuses until we've killed enough
- // tasks, making sure we don't kill too many from any queue
- for (RMContainer container : runningContainers) {
+ // Scan down the rest of the containers until we've preempted enough, making
+ // sure we don't preempt too many from any queue
+ Iterator<RMContainer> runningIter = runningContainers.iterator();
+ while (runningIter.hasNext() &&
+ Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+ toPreempt, Resources.none())) {
+ RMContainer container = runningIter.next();
FSLeafQueue sched = queues.get(container);
- if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
- sched.getResourceUsage(), sched.getFairShare())) {
- LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
- "res=" + container.getContainer().getResource() +
- ") from queue " + sched.getName());
- ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus(
+ if (!preemptedThisRound.contains(container) &&
+ Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+ sched.getResourceUsage(), sched.getFairShare())) {
+ warnOrKillContainer(container, apps.get(container), sched);
+
+ warnedContainers.add(container);
+ Resources.subtractFrom(toPreempt, container.getContainer().getResource());
+ }
+ }
+ }
+
+ private void warnOrKillContainer(RMContainer container, FSSchedulerApp app,
+ FSLeafQueue queue) {
+ LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
+ "res=" + container.getContainer().getResource() +
+ ") from queue " + queue.getName());
+
+ Long time = app.getContainerPreemptionTime(container);
+
+ if (time != null) {
+ // if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
+ // proceed with kill
+ if (time + waitTimeBeforeKill < clock.getTime()) {
+ ContainerStatus status =
+ SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
// TODO: Not sure if this ever actually adds this to the list of cleanup
// containers on the RMNode (see SchedulerNode.releaseContainer()).
completedContainer(container, status, RMContainerEventType.KILL);
-
- toPreempt = Resources.subtract(toPreempt,
- container.getContainer().getResource());
- if (Resources.lessThanOrEqual(RESOURCE_CALCULATOR, clusterCapacity,
- toPreempt, Resources.none())) {
- break;
- }
+ LOG.info("Killing container" + container +
+ " (after waiting for premption for " +
+ (clock.getTime() - time) + "ms)");
}
+ } else {
+ // track the request in the FSSchedulerApp itself
+ app.addPreemption(container, clock.getTime());
}
}
@@ -487,11 +540,11 @@ public class FairScheduler implements Re
return clusterCapacity;
}
- public Clock getClock() {
+ public synchronized Clock getClock() {
return clock;
}
- protected void setClock(Clock clock) {
+ protected synchronized void setClock(Clock clock) {
this.clock = clock;
}
@@ -746,10 +799,18 @@ public class FairScheduler implements Re
LOG.debug("allocate:" +
" applicationAttemptId=" + appAttemptId +
" #ask=" + ask.size());
- }
+ LOG.debug("Preempting " + application.getPreemptionContainers().size()
+ + " container(s)");
+ }
+
+ Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
+ for (RMContainer container : application.getPreemptionContainers()) {
+ preemptionContainerIds.add(container.getContainerId());
+ }
+
return new Allocation(application.pullNewlyAllocatedContainers(),
- application.getHeadroom());
+ application.getHeadroom(), preemptionContainerIds);
}
}
@@ -950,7 +1011,9 @@ public class FairScheduler implements Re
assignMultiple = this.conf.getAssignMultiple();
maxAssign = this.conf.getMaxAssign();
sizeBasedWeight = this.conf.getSizeBasedWeight();
-
+ preemptionInterval = this.conf.getPreemptionInterval();
+ waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
+
if (!initialized) {
rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
this.rmContext = rmContext;
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1480778&r1=1480777&r2=1480778&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Thu May 9 21:21:42 2013
@@ -52,6 +52,11 @@ public class FairSchedulerConfiguration
/** Whether preemption is enabled. */
protected static final String PREEMPTION = CONF_PREFIX + "preemption";
protected static final boolean DEFAULT_PREEMPTION = false;
+
+ protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval";
+ protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
+ protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill";
+ protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;
/** Whether to assign multiple containers in one check-in. */
protected static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
@@ -120,4 +125,12 @@ public class FairSchedulerConfiguration
return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir",
"/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");
}
+
+ public int getPreemptionInterval() {
+ return getInt(PREEMPTION_INTERVAL, DEFAULT_PREEMPTION_INTERVAL);
+ }
+
+ public int getWaitTimeBeforeKill() {
+ return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL);
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1480778&r1=1480777&r2=1480778&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Thu May 9 21:21:42 2013
@@ -30,6 +30,7 @@ import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -891,9 +892,16 @@ public class TestFairScheduler {
*/
public void testChoiceOfPreemptedContainers() throws Exception {
Configuration conf = createConfiguration();
+
+ conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
+ conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
+
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
+ MockClock clock = new MockClock();
+ scheduler.setClock(clock);
+
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
@@ -988,15 +996,38 @@ public class TestFairScheduler {
Resources.createResource(2 * 1024));
assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size());
- assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size());
+
+ // First verify we are adding containers to preemption list for the application
+ assertTrue(!Collections.disjoint(scheduler.applications.get(app3).getLiveContainers(),
+ scheduler.applications.get(app3).getPreemptionContainers()));
+ assertTrue(!Collections.disjoint(scheduler.applications.get(app6).getLiveContainers(),
+ scheduler.applications.get(app6).getPreemptionContainers()));
+
+ // Pretend 15 seconds have passed
+ clock.tick(15);
+
+ // Trigger a kill by insisting we want containers back
+ scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
+ Resources.createResource(2 * 1024));
+
+ // At this point the containers should have been killed (since we are not simulating AM)
assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
+ assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
+
+ // Trigger a kill by insisting we want containers back
+ scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
+ Resources.createResource(2 * 1024));
+
+ // Pretend 15 seconds have passed
+ clock.tick(15);
// We should be able to claw back another container from A and B each.
// Make sure it is lowest priority container.
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
+
assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());