You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sz...@apache.org on 2013/05/10 00:46:41 UTC
svn commit: r1480824 [2/2] - in
/hadoop/common/branches/HDFS-2802/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java...
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Thu May 9 22:46:39 2013
@@ -23,11 +23,12 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -57,9 +58,12 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
+@Private
+@Unstable
public class FSSchedulerApp extends SchedulerApplication {
private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class);
@@ -83,7 +87,9 @@ public class FSSchedulerApp extends Sche
final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
new HashMap<Priority, Map<NodeId, RMContainer>>();
-
+
+ final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
+
/**
* Count how many times the application has been given an opportunity
* to schedule a task at each priority. Each time the scheduler
@@ -230,6 +236,9 @@ public class FSSchedulerApp extends Sche
Resource containerResource = rmContainer.getContainer().getResource();
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
Resources.subtractFrom(currentConsumption, containerResource);
+
+ // remove from preemption map if it is completed
+ preemptionMap.remove(rmContainer);
}
synchronized public List<Container> pullNewlyAllocatedContainers() {
@@ -306,8 +315,7 @@ public class FSSchedulerApp extends Sche
* Used only by unit tests
* @return total current reservations
*/
- @Stable
- @Private
+ @VisibleForTesting
public synchronized Resource getCurrentReservation() {
return currentReservation;
}
@@ -572,4 +580,18 @@ public class FSSchedulerApp extends Sche
" priority " + priority);
allowedLocalityLevel.put(priority, level);
}
+
+ // related methods
+ public void addPreemption(RMContainer container, long time) {
+ assert preemptionMap.get(container) == null;
+ preemptionMap.put(container, time);
+ }
+
+ public Long getContainerPreemptionTime(RMContainer container) {
+ return preemptionMap.get(container);
+ }
+
+ public Set<RMContainer> getPreemptionContainers() {
+ return preemptionMap.keySet();
+ }
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java Thu May 9 22:46:39 2013
@@ -25,6 +25,8 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -39,6 +41,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+@Private
+@Unstable
public class FSSchedulerNode extends SchedulerNode {
private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class);
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu May 9 22:46:39 2013
@@ -24,8 +24,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
@@ -155,10 +158,16 @@ public class FairScheduler implements Re
private Resource clusterCapacity =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
- // How often tasks are preempted (must be longer than a couple
+ // How often tasks are preempted
+ protected long preemptionInterval;
+
+ // ms to wait before force killing stuff (must be longer than a couple
// of heartbeats to give task-kill commands a chance to act).
- protected long preemptionInterval = 15000;
-
+ protected long waitTimeBeforeKill;
+
+ // Containers whose AMs have been warned that they will be preempted soon.
+ private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
+
protected boolean preemptionEnabled;
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
@@ -225,10 +234,6 @@ public class FairScheduler implements Re
// Recursively compute fair shares for all queues
// and update metrics
rootQueue.recomputeShares();
-
- // Update recorded capacity of root queue (child queues are updated
- // when fair share is calculated).
- rootMetrics.setAvailableResourcesToQueue(clusterCapacity);
}
/**
@@ -335,34 +340,78 @@ public class FairScheduler implements Re
// Sort containers into reverse order of priority
Collections.sort(runningContainers, new Comparator<RMContainer>() {
public int compare(RMContainer c1, RMContainer c2) {
- return c2.getContainer().getPriority().compareTo(
+ int ret = c2.getContainer().getPriority().compareTo(
c1.getContainer().getPriority());
+ if (ret == 0) {
+ return c2.getContainerId().compareTo(c1.getContainerId());
+ }
+ return ret;
}
});
+
+ // Scan down the list of containers we've already warned and kill them
+ // if we need to. Remove any containers from the list that we don't need
+ // or that are no longer running.
+ Iterator<RMContainer> warnedIter = warnedContainers.iterator();
+ Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
+ while (warnedIter.hasNext()) {
+ RMContainer container = warnedIter.next();
+ if (container.getState() == RMContainerState.RUNNING &&
+ Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+ toPreempt, Resources.none())) {
+ warnOrKillContainer(container, apps.get(container), queues.get(container));
+ preemptedThisRound.add(container);
+ Resources.subtractFrom(toPreempt, container.getContainer().getResource());
+ } else {
+ warnedIter.remove();
+ }
+ }
- // Scan down the sorted list of task statuses until we've killed enough
- // tasks, making sure we don't kill too many from any queue
- for (RMContainer container : runningContainers) {
+ // Scan down the rest of the containers until we've preempted enough, making
+ // sure we don't preempt too many from any queue
+ Iterator<RMContainer> runningIter = runningContainers.iterator();
+ while (runningIter.hasNext() &&
+ Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+ toPreempt, Resources.none())) {
+ RMContainer container = runningIter.next();
FSLeafQueue sched = queues.get(container);
- if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
- sched.getResourceUsage(), sched.getFairShare())) {
- LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
- "res=" + container.getContainer().getResource() +
- ") from queue " + sched.getName());
- ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus(
+ if (!preemptedThisRound.contains(container) &&
+ Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+ sched.getResourceUsage(), sched.getFairShare())) {
+ warnOrKillContainer(container, apps.get(container), sched);
+
+ warnedContainers.add(container);
+ Resources.subtractFrom(toPreempt, container.getContainer().getResource());
+ }
+ }
+ }
+
+ private void warnOrKillContainer(RMContainer container, FSSchedulerApp app,
+ FSLeafQueue queue) {
+ LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
+ "res=" + container.getContainer().getResource() +
+ ") from queue " + queue.getName());
+
+ Long time = app.getContainerPreemptionTime(container);
+
+ if (time != null) {
+ // if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
+ // proceed with kill
+ if (time + waitTimeBeforeKill < clock.getTime()) {
+ ContainerStatus status =
+ SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
// TODO: Not sure if this ever actually adds this to the list of cleanup
// containers on the RMNode (see SchedulerNode.releaseContainer()).
completedContainer(container, status, RMContainerEventType.KILL);
-
- toPreempt = Resources.subtract(toPreempt,
- container.getContainer().getResource());
- if (Resources.lessThanOrEqual(RESOURCE_CALCULATOR, clusterCapacity,
- toPreempt, Resources.none())) {
- break;
- }
+ LOG.info("Killing container" + container +
+ " (after waiting for premption for " +
+ (clock.getTime() - time) + "ms)");
}
+ } else {
+ // track the request in the FSSchedulerApp itself
+ app.addPreemption(container, clock.getTime());
}
}
@@ -487,11 +536,11 @@ public class FairScheduler implements Re
return clusterCapacity;
}
- public Clock getClock() {
+ public synchronized Clock getClock() {
return clock;
}
- protected void setClock(Clock clock) {
+ protected synchronized void setClock(Clock clock) {
this.clock = clock;
}
@@ -617,6 +666,7 @@ public class FairScheduler implements Re
} else {
application.containerCompleted(rmContainer, containerStatus, event);
node.releaseContainer(container);
+ updateRootQueueMetrics();
}
LOG.info("Application " + applicationAttemptId +
@@ -628,6 +678,7 @@ public class FairScheduler implements Re
private synchronized void addNode(RMNode node) {
nodes.put(node.getNodeID(), new FSSchedulerNode(node));
Resources.addTo(clusterCapacity, node.getTotalCapability());
+ updateRootQueueMetrics();
LOG.info("Added node " + node.getNodeAddress() +
" cluster capacity: " + clusterCapacity);
@@ -636,6 +687,7 @@ public class FairScheduler implements Re
private synchronized void removeNode(RMNode rmNode) {
FSSchedulerNode node = nodes.get(rmNode.getNodeID());
Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
+ updateRootQueueMetrics();
// Remove running containers
List<RMContainer> runningContainers = node.getRunningContainers();
@@ -746,10 +798,18 @@ public class FairScheduler implements Re
LOG.debug("allocate:" +
" applicationAttemptId=" + appAttemptId +
" #ask=" + ask.size());
- }
+ LOG.debug("Preempting " + application.getPreemptionContainers().size()
+ + " container(s)");
+ }
+
+ Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
+ for (RMContainer container : application.getPreemptionContainers()) {
+ preemptionContainerIds.add(container.getContainerId());
+ }
+
return new Allocation(application.pullNewlyAllocatedContainers(),
- application.getHeadroom());
+ application.getHeadroom(), preemptionContainerIds);
}
}
@@ -832,6 +892,7 @@ public class FairScheduler implements Re
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
queueMgr.getRootQueue().assignContainer(node),
Resources.none())) {
+ assignedContainers++;
assignedContainer = true;
}
if (!assignedContainer) { break; }
@@ -839,6 +900,7 @@ public class FairScheduler implements Re
if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }
}
}
+ updateRootQueueMetrics();
}
@Override
@@ -860,6 +922,18 @@ public class FairScheduler implements Re
}
return new SchedulerAppReport(applications.get(appAttemptId));
}
+
+ /**
+ * Subqueue metrics might be a little out of date because fair shares are
+ * recalculated at the update interval, but the root queue metrics needs to
+ * be updated synchronously with allocations and completions so that cluster
+ * metrics will be consistent.
+ */
+ private void updateRootQueueMetrics() {
+ rootMetrics.setAvailableResourcesToQueue(
+ Resources.subtract(
+ clusterCapacity, rootMetrics.getAllocatedResources()));
+ }
@Override
public QueueMetrics getRootQueueMetrics() {
@@ -950,7 +1024,9 @@ public class FairScheduler implements Re
assignMultiple = this.conf.getAssignMultiple();
maxAssign = this.conf.getMaxAssign();
sizeBasedWeight = this.conf.getSizeBasedWeight();
-
+ preemptionInterval = this.conf.getPreemptionInterval();
+ waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
+
if (!initialized) {
rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
this.rmContext = rmContext;
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Thu May 9 22:46:39 2013
@@ -18,12 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.File;
-
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+@Private
+@Evolving
public class FairSchedulerConfiguration extends Configuration {
public static final String FS_CONFIGURATION_FILE = "fair-scheduler.xml";
@@ -52,6 +55,11 @@ public class FairSchedulerConfiguration
/** Whether preemption is enabled. */
protected static final String PREEMPTION = CONF_PREFIX + "preemption";
protected static final boolean DEFAULT_PREEMPTION = false;
+
+ protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval";
+ protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
+ protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill";
+ protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;
/** Whether to assign multiple containers in one check-in. */
protected static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
@@ -120,4 +128,12 @@ public class FairSchedulerConfiguration
return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir",
"/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");
}
+
+ public int getPreemptionInterval() {
+ return getInt(PREEMPTION_INTERVAL, DEFAULT_PREEMPTION_INTERVAL);
+ }
+
+ public int getWaitTimeBeforeKill() {
+ return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL);
+ }
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java Thu May 9 22:46:39 2013
@@ -22,14 +22,14 @@ import java.util.Comparator;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
@Public
-@Unstable
+@Evolving
public abstract class SchedulingPolicy {
private static final ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy> instances =
new ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy>();
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Thu May 9 22:46:39 2013
@@ -21,6 +21,8 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
@@ -29,6 +31,8 @@ import org.apache.hadoop.yarn.server.res
import com.google.common.annotations.VisibleForTesting;
+@Private
+@Unstable
public class FairSharePolicy extends SchedulingPolicy {
@VisibleForTesting
public static final String NAME = "Fairshare";
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java Thu May 9 22:46:39 2013
@@ -21,6 +21,8 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
@@ -28,6 +30,8 @@ import org.apache.hadoop.yarn.server.res
import com.google.common.annotations.VisibleForTesting;
+@Private
+@Unstable
public class FifoPolicy extends SchedulingPolicy {
@VisibleForTesting
public static final String NAME = "FIFO";
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Thu May 9 22:46:39 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.util.Map;
@@ -199,6 +200,8 @@ public class MockRM extends ResourceMana
return client.submitApplication(req);
} catch (YarnRemoteException e) {
e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
}
return null;
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java Thu May 9 22:46:39 2013
@@ -339,7 +339,7 @@ public class TestClientRMTokens {
DelegationToken token = loggedInUser
.doAs(new PrivilegedExceptionAction<DelegationToken>() {
@Override
- public DelegationToken run() throws YarnRemoteException {
+ public DelegationToken run() throws YarnRemoteException, IOException {
GetDelegationTokenRequest request = Records
.newRecord(GetDelegationTokenRequest.class);
request.setRenewer(renewerString);
@@ -355,7 +355,7 @@ public class TestClientRMTokens {
throws IOException, InterruptedException {
long nextExpTime = loggedInUser.doAs(new PrivilegedExceptionAction<Long>() {
@Override
- public Long run() throws YarnRemoteException {
+ public Long run() throws YarnRemoteException, IOException {
RenewDelegationTokenRequest request = Records
.newRecord(RenewDelegationTokenRequest.class);
request.setDelegationToken(dToken);
@@ -371,7 +371,7 @@ public class TestClientRMTokens {
throws IOException, InterruptedException {
loggedInUser.doAs(new PrivilegedExceptionAction<Void>() {
@Override
- public Void run() throws YarnRemoteException {
+ public Void run() throws YarnRemoteException, IOException {
CancelDelegationTokenRequest request = Records
.newRecord(CancelDelegationTokenRequest.class);
request.setDelegationToken(dToken);
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java Thu May 9 22:46:39 2013
@@ -66,20 +66,20 @@ public class TestQueueMetrics {
MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0);
- metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
- metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
+ metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
+ metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
- checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0);
+ checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
- metrics.allocateResources(user, 3, Resources.createResource(2*GB));
- checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 0, 0);
+ metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
+ checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
- metrics.releaseResources(user, 1, Resources.createResource(2*GB));
- checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0);
+ metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
+ checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0);
@@ -148,25 +148,25 @@ public class TestQueueMetrics {
checkApps(queueSource, 1, 1, 0, 0, 0, 0);
checkApps(userSource, 1, 1, 0, 0, 0, 0);
- metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
- metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
- metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
+ metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
+ metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
+ metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
- checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0);
- checkResources(userSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0);
+ checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
+ checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
checkApps(userSource, 1, 0, 1, 0, 0, 0);
- metrics.allocateResources(user, 3, Resources.createResource(2*GB));
- checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 0, 0);
- checkResources(userSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 0, 0);
-
- metrics.releaseResources(user, 1, Resources.createResource(2*GB));
- checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0);
- checkResources(userSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0);
+ metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
+ checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
+ checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
+
+ metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
+ checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
+ checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0);
@@ -197,35 +197,35 @@ public class TestQueueMetrics {
checkApps(userSource, 1, 1, 0, 0, 0, 0);
checkApps(parentUserSource, 1, 1, 0, 0, 0, 0);
- parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
- metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB));
- parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
- metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB));
- metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
- checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0);
- checkResources(parentQueueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0);
- checkResources(userSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0);
- checkResources(parentUserSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0);
+ parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
+ metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
+ parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
+ metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
+ metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
+ checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
+ checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
+ checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
+ checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
metrics.incrAppsRunning(app, user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
checkApps(userSource, 1, 0, 1, 0, 0, 0);
- metrics.allocateResources(user, 3, Resources.createResource(2*GB));
- metrics.reserveResource(user, Resources.createResource(3*GB));
+ metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
+ metrics.reserveResource(user, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
- checkResources(queueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 3*GB, 1);
- checkResources(parentQueueSource, 6*GB, 3, 3, 0, 100*GB, 9*GB, 2, 3*GB, 1);
- checkResources(userSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 3*GB, 1);
- checkResources(parentUserSource, 6*GB, 3, 3, 0, 10*GB, 9*GB, 2, 3*GB, 1);
-
- metrics.releaseResources(user, 1, Resources.createResource(2*GB));
- metrics.unreserveResource(user, Resources.createResource(3*GB));
- checkResources(queueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0);
- checkResources(parentQueueSource, 4*GB, 2, 3, 1, 100*GB, 9*GB, 2, 0, 0);
- checkResources(userSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0);
- checkResources(parentUserSource, 4*GB, 2, 3, 1, 10*GB, 9*GB, 2, 0, 0);
+ checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1);
+ checkResources(parentQueueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1);
+ checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
+ checkResources(parentUserSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
+
+ metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
+ metrics.unreserveResource(user, Resources.createResource(3*GB, 3));
+ checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
+ checkResources(parentQueueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
+ checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
+ checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
metrics.finishApp(app, RMAppAttemptState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0);
@@ -277,18 +277,23 @@ public class TestQueueMetrics {
}
public static void checkResources(MetricsSource source, int allocatedMB,
- int allocCtnrs, long aggreAllocCtnrs, long aggreReleasedCtnrs,
- int availableMB, int pendingMB, int pendingCtnrs,
- int reservedMB, int reservedCtnrs) {
+ int allocatedCores, int allocCtnrs, long aggreAllocCtnrs,
+ long aggreReleasedCtnrs, int availableMB, int availableCores, int pendingMB,
+ int pendingCores, int pendingCtnrs, int reservedMB, int reservedCores,
+ int reservedCtnrs) {
MetricsRecordBuilder rb = getMetrics(source);
assertGauge("AllocatedMB", allocatedMB, rb);
+ assertGauge("AllocatedVCores", allocatedCores, rb);
assertGauge("AllocatedContainers", allocCtnrs, rb);
assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb);
assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb);
assertGauge("AvailableMB", availableMB, rb);
+ assertGauge("AvailableVCores", availableCores, rb);
assertGauge("PendingMB", pendingMB, rb);
+ assertGauge("PendingVCores", pendingCores, rb);
assertGauge("PendingContainers", pendingCtnrs, rb);
assertGauge("ReservedMB", reservedMB, rb);
+ assertGauge("ReservedVCores", reservedCores, rb);
assertGauge("ReservedContainers", reservedCtnrs, rb);
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Thu May 9 22:46:39 2013
@@ -30,6 +30,7 @@ import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
@@ -127,6 +129,7 @@ public class TestFairScheduler {
public void tearDown() {
scheduler = null;
resourceManager = null;
+ QueueMetrics.clearQueueMetrics();
}
private Configuration createConfiguration() {
@@ -336,6 +339,13 @@ public class TestFairScheduler {
assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
getResourceUsage().getMemory());
+
+ // verify metrics
+ QueueMetrics queue1Metrics = scheduler.getQueueManager().getQueue("queue1")
+ .getMetrics();
+ assertEquals(1024, queue1Metrics.getAllocatedMB());
+ assertEquals(1024, scheduler.getRootQueueMetrics().getAllocatedMB());
+ assertEquals(512, scheduler.getRootQueueMetrics().getAvailableMB());
}
@Test (timeout = 5000)
@@ -891,9 +901,16 @@ public class TestFairScheduler {
*/
public void testChoiceOfPreemptedContainers() throws Exception {
Configuration conf = createConfiguration();
+
+ conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
+ conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
+
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
+ MockClock clock = new MockClock();
+ scheduler.setClock(clock);
+
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
@@ -988,15 +1005,38 @@ public class TestFairScheduler {
Resources.createResource(2 * 1024));
assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size());
- assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size());
+
+ // First verify we are adding containers to preemption list for the application
+ assertTrue(!Collections.disjoint(scheduler.applications.get(app3).getLiveContainers(),
+ scheduler.applications.get(app3).getPreemptionContainers()));
+ assertTrue(!Collections.disjoint(scheduler.applications.get(app6).getLiveContainers(),
+ scheduler.applications.get(app6).getPreemptionContainers()));
+
+ // Pretend 15 seconds have passed
+ clock.tick(15);
+
+ // Trigger a kill by insisting we want containers back
+ scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
+ Resources.createResource(2 * 1024));
+
+ // At this point the containers should have been killed (since we are not simulating AM)
assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
+ assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
+
+ // Trigger a kill by insisting we want containers back
+ scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
+ Resources.createResource(2 * 1024));
+
+ // Pretend 15 seconds have passed
+ clock.tick(15);
// We should be able to claw back another container from A and B each.
// Make sure it is lowest priority container.
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
+
assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
@@ -1245,6 +1285,7 @@ public class TestFairScheduler {
scheduler.handle(updateEvent);
assertEquals(1, app.getLiveContainers().size());
+ assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
// Create request at higher priority
createSchedulingRequestExistingApplication(1024, 1, attId);
@@ -1260,6 +1301,7 @@ public class TestFairScheduler {
// Complete container
scheduler.allocate(attId, new ArrayList<ResourceRequest>(),
Arrays.asList(containerId));
+ assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB());
// Schedule at opening
scheduler.update();
@@ -1271,6 +1313,7 @@ public class TestFairScheduler {
for (RMContainer liveContainer : liveContainers) {
Assert.assertEquals(2, liveContainer.getContainer().getPriority().getPriority());
}
+ assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
}
@Test
@@ -1382,6 +1425,37 @@ public class TestFairScheduler {
assertEquals(1, app2.getLiveContainers().size());
}
+ @Test(timeout = 3000)
+ public void testMaxAssign() throws AllocationConfigurationException {
+ // set required scheduler configs
+ scheduler.assignMultiple = true;
+ scheduler.getQueueManager().getLeafQueue("root.default")
+ .setPolicy(SchedulingPolicy.getDefault());
+
+ RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(16384));
+ NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+ NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+ scheduler.handle(nodeEvent);
+
+ ApplicationAttemptId attId =
+ createSchedulingRequest(1024, "root.default", "user", 8);
+ FSSchedulerApp app = scheduler.applications.get(attId);
+
+ // set maxAssign to 2: only 2 containers should be allocated
+ scheduler.maxAssign = 2;
+ scheduler.update();
+ scheduler.handle(updateEvent);
+ assertEquals("Incorrect number of containers allocated", 2, app
+ .getLiveContainers().size());
+
+ // set maxAssign to -1: all remaining containers should be allocated
+ scheduler.maxAssign = -1;
+ scheduler.update();
+ scheduler.handle(updateEvent);
+ assertEquals("Incorrect number of containers allocated", 8, app
+ .getLiveContainers().size());
+ }
+
/**
* Test to verify the behavior of
* {@link FSQueue#assignContainer(FSSchedulerNode)})
@@ -1544,4 +1618,24 @@ public class TestFairScheduler {
assertEquals(1, app.getLiveContainers().size());
assertEquals(0, app.getReservedContainers().size());
}
+
+ @Test
+ public void testRemoveNodeUpdatesRootQueueMetrics() {
+ assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
+
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
+ NodeAddedSchedulerEvent addEvent = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(addEvent);
+
+ assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB());
+ scheduler.update(); // update shouldn't change things
+ assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB());
+
+ NodeRemovedSchedulerEvent removeEvent = new NodeRemovedSchedulerEvent(node1);
+ scheduler.handle(removeEvent);
+
+ assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
+ scheduler.update(); // update shouldn't change things
+ assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
+ }
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java Thu May 9 22:46:39 2013
@@ -485,7 +485,8 @@ public class TestContainerManagerSecurit
}
private Container requestAndGetContainer(AMRMProtocol scheduler,
- ApplicationId appID) throws YarnRemoteException, InterruptedException {
+ ApplicationId appID) throws YarnRemoteException, InterruptedException,
+ IOException {
// Request a container allocation.
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java Thu May 9 22:46:39 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.webproxy;
+import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
@@ -78,9 +79,10 @@ public class AppReportFetcher {
* @param appId the id of the application to get.
* @return the ApplicationReport for that app.
* @throws YarnRemoteException on any error.
+ * @throws IOException
*/
public ApplicationReport getApplicationReport(ApplicationId appId)
- throws YarnRemoteException {
+ throws YarnRemoteException, IOException {
GetApplicationReportRequest request = recordFactory
.newRecordInstance(GetApplicationReportRequest.class);
request.setApplicationId(appId);