You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC
svn commit: r1619012 [14/26] - in
/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./
hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Tue Aug 19 23:49:39 2014
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,10 +39,10 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -49,19 +50,13 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -69,16 +64,15 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -96,17 +90,20 @@ import org.apache.hadoop.yarn.util.resou
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
@LimitedPrivate("yarn")
@Evolving
@SuppressWarnings("unchecked")
-public class CapacityScheduler extends AbstractYarnScheduler
- implements PreemptableResourceScheduler, CapacitySchedulerContext,
- Configurable {
+public class CapacityScheduler extends
+ AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements
+ PreemptableResourceScheduler, CapacitySchedulerContext, Configurable {
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
private CSQueue root;
+ // timeout to join when we stop this service
+ protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() {
@Override
@@ -182,17 +179,7 @@ public class CapacityScheduler extends A
private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
- private Map<NodeId, FiCaSchedulerNode> nodes =
- new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
-
- private Resource clusterResource =
- RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
- private int numNodeManagers = 0;
-
- private Resource minimumAllocation;
- private Resource maximumAllocation;
-
- private boolean initialized = false;
+ private AtomicInteger numNodeManagers = new AtomicInteger(0);
private ResourceCalculator calculator;
private boolean usePortForNodeName;
@@ -209,7 +196,19 @@ public class CapacityScheduler extends A
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
- public CapacityScheduler() {}
+ private boolean overrideWithQueueMappings = false;
+ private List<QueueMapping> mappings = null;
+ private Groups groups;
+
+ @VisibleForTesting
+ public synchronized String getMappedQueueForTest(String user)
+ throws IOException {
+ return getMappedQueue(user);
+ }
+
+ public CapacityScheduler() {
+ super(CapacityScheduler.class.getName());
+ }
@Override
public QueueMetrics getRootQueueMetrics() {
@@ -231,16 +230,6 @@ public class CapacityScheduler extends A
}
@Override
- public Resource getMinimumResourceCapability() {
- return minimumAllocation;
- }
-
- @Override
- public Resource getMaximumResourceCapability() {
- return maximumAllocation;
- }
-
- @Override
public Comparator<FiCaSchedulerApp> getApplicationComparator() {
return applicationComparator;
}
@@ -256,66 +245,95 @@ public class CapacityScheduler extends A
}
@Override
- public synchronized int getNumClusterNodes() {
- return numNodeManagers;
+ public int getNumClusterNodes() {
+ return numNodeManagers.get();
}
@Override
- public RMContext getRMContext() {
+ public synchronized RMContext getRMContext() {
return this.rmContext;
}
@Override
- public Resource getClusterResources() {
- return clusterResource;
+ public synchronized void setRMContext(RMContext rmContext) {
+ this.rmContext = rmContext;
}
-
+
+ private synchronized void initScheduler(Configuration configuration) throws
+ IOException {
+ this.conf = loadCapacitySchedulerConfiguration(configuration);
+ validateConf(this.conf);
+ this.minimumAllocation = this.conf.getMinimumAllocation();
+ this.maximumAllocation = this.conf.getMaximumAllocation();
+ this.calculator = this.conf.getResourceCalculator();
+ this.usePortForNodeName = this.conf.getUsePortForNodeName();
+ this.applications =
+ new ConcurrentHashMap<ApplicationId,
+ SchedulerApplication<FiCaSchedulerApp>>();
+ initializeQueues(this.conf);
+
+ scheduleAsynchronously = this.conf.getScheduleAynschronously();
+ asyncScheduleInterval =
+ this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
+ DEFAULT_ASYNC_SCHEDULER_INTERVAL);
+ if (scheduleAsynchronously) {
+ asyncSchedulerThread = new AsyncScheduleThread(this);
+ }
+
+ LOG.info("Initialized CapacityScheduler with " +
+ "calculator=" + getResourceCalculator().getClass() + ", " +
+ "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
+ "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
+ "asynchronousScheduling=" + scheduleAsynchronously + ", " +
+ "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
+ }
+
+ private synchronized void startSchedulerThreads() {
+ if (scheduleAsynchronously) {
+ Preconditions.checkNotNull(asyncSchedulerThread,
+ "asyncSchedulerThread is null");
+ asyncSchedulerThread.start();
+ }
+ }
+
@Override
- public synchronized void
- reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+ public void serviceInit(Configuration conf) throws Exception {
Configuration configuration = new Configuration(conf);
- if (!initialized) {
- this.rmContext = rmContext;
- this.conf = loadCapacitySchedulerConfiguration(configuration);
- validateConf(this.conf);
- this.minimumAllocation = this.conf.getMinimumAllocation();
- this.maximumAllocation = this.conf.getMaximumAllocation();
- this.calculator = this.conf.getResourceCalculator();
- this.usePortForNodeName = this.conf.getUsePortForNodeName();
- this.applications =
- new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
+ initScheduler(configuration);
+ super.serviceInit(conf);
+ }
- initializeQueues(this.conf);
-
- scheduleAsynchronously = this.conf.getScheduleAynschronously();
- asyncScheduleInterval =
- this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
- DEFAULT_ASYNC_SCHEDULER_INTERVAL);
- if (scheduleAsynchronously) {
- asyncSchedulerThread = new AsyncScheduleThread(this);
- asyncSchedulerThread.start();
- }
-
- initialized = true;
- LOG.info("Initialized CapacityScheduler with " +
- "calculator=" + getResourceCalculator().getClass() + ", " +
- "minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
- "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
- "asynchronousScheduling=" + scheduleAsynchronously + ", " +
- "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
-
- } else {
- CapacitySchedulerConfiguration oldConf = this.conf;
- this.conf = loadCapacitySchedulerConfiguration(configuration);
- validateConf(this.conf);
- try {
- LOG.info("Re-initializing queues...");
- reinitializeQueues(this.conf);
- } catch (Throwable t) {
- this.conf = oldConf;
- throw new IOException("Failed to re-init queues", t);
+ @Override
+ public void serviceStart() throws Exception {
+ startSchedulerThreads();
+ super.serviceStart();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ synchronized (this) {
+ if (scheduleAsynchronously && asyncSchedulerThread != null) {
+ asyncSchedulerThread.interrupt();
+ asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS);
}
}
+ super.serviceStop();
+ }
+
+ @Override
+ public synchronized void
+ reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+ Configuration configuration = new Configuration(conf);
+ CapacitySchedulerConfiguration oldConf = this.conf;
+ this.conf = loadCapacitySchedulerConfiguration(configuration);
+ validateConf(this.conf);
+ try {
+ LOG.info("Re-initializing queues...");
+ reinitializeQueues(this.conf);
+ } catch (Throwable t) {
+ this.conf = oldConf;
+ throw new IOException("Failed to re-init queues", t);
+ }
}
long getAsyncScheduleInterval() {
@@ -390,7 +408,32 @@ public class CapacityScheduler extends A
}
}
private static final QueueHook noop = new QueueHook();
-
+
+ private void initializeQueueMappings() throws IOException {
+ overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
+ LOG.info("Initialized queue mappings, override: "
+ + overrideWithQueueMappings);
+ // Get new user/group mappings
+ List<QueueMapping> newMappings = conf.getQueueMappings();
+ //check if mappings refer to valid queues
+ for (QueueMapping mapping : newMappings) {
+ if (!mapping.queue.equals(CURRENT_USER_MAPPING) &&
+ !mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
+ CSQueue queue = queues.get(mapping.queue);
+ if (queue == null || !(queue instanceof LeafQueue)) {
+ throw new IOException(
+ "mapping contains invalid or non-leaf queue " + mapping.queue);
+ }
+ }
+ }
+ //apply the new mappings since they are valid
+ mappings = newMappings;
+ // initialize groups if mappings are present
+ if (mappings.size() > 0) {
+ groups = new Groups(conf);
+ }
+ }
+
@Lock(CapacityScheduler.class)
private void initializeQueues(CapacitySchedulerConfiguration conf)
throws IOException {
@@ -398,7 +441,9 @@ public class CapacityScheduler extends A
root =
parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
queues, queues, noop);
+
LOG.info("Initialized root queue " + root);
+ initializeQueueMappings();
}
@Lock(CapacityScheduler.class)
@@ -418,6 +463,7 @@ public class CapacityScheduler extends A
// Re-configure queues
root.reinitialize(newRoot, clusterResource);
+ initializeQueueMappings();
}
/**
@@ -505,12 +551,73 @@ public class CapacityScheduler extends A
}
synchronized CSQueue getQueue(String queueName) {
+ if (queueName == null) {
+ return null;
+ }
return queues.get(queueName);
}
+ private static final String CURRENT_USER_MAPPING = "%user";
+
+ private static final String PRIMARY_GROUP_MAPPING = "%primary_group";
+
+ private String getMappedQueue(String user) throws IOException {
+ for (QueueMapping mapping : mappings) {
+ if (mapping.type == MappingType.USER) {
+ if (mapping.source.equals(CURRENT_USER_MAPPING)) {
+ if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
+ return user;
+ }
+ else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
+ return groups.getGroups(user).get(0);
+ }
+ else {
+ return mapping.queue;
+ }
+ }
+ if (user.equals(mapping.source)) {
+ return mapping.queue;
+ }
+ }
+ if (mapping.type == MappingType.GROUP) {
+ for (String userGroups : groups.getGroups(user)) {
+ if (userGroups.equals(mapping.source)) {
+ return mapping.queue;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
private synchronized void addApplication(ApplicationId applicationId,
- String queueName, String user) {
- // santiy checks.
+ String queueName, String user, boolean isAppRecovering) {
+
+ if (mappings != null && mappings.size() > 0) {
+ try {
+ String mappedQueue = getMappedQueue(user);
+ if (mappedQueue != null) {
+ // We have a mapping, should we use it?
+ if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
+ || overrideWithQueueMappings) {
+ LOG.info("Application " + applicationId + " user " + user
+ + " mapping [" + queueName + "] to [" + mappedQueue
+ + "] override " + overrideWithQueueMappings);
+ queueName = mappedQueue;
+ RMApp rmApp = rmContext.getRMApps().get(applicationId);
+ rmApp.setQueue(queueName);
+ }
+ }
+ } catch (IOException ioex) {
+ String message = "Failed to submit application " + applicationId +
+ " submitted by user " + user + " reason: " + ioex.getMessage();
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppRejectedEvent(applicationId, message));
+ return;
+ }
+ }
+
+ // sanity checks.
CSQueue queue = getQueue(queueName);
if (queue == null) {
String message = "Application " + applicationId +
@@ -536,19 +643,28 @@ public class CapacityScheduler extends A
.handle(new RMAppRejectedEvent(applicationId, ace.toString()));
return;
}
- SchedulerApplication application =
- new SchedulerApplication(queue, user);
+ // update the metrics
+ queue.getMetrics().submitApp(user);
+ SchedulerApplication<FiCaSchedulerApp> application =
+ new SchedulerApplication<FiCaSchedulerApp>(queue, user);
applications.put(applicationId, application);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName);
- rmContext.getDispatcher().getEventHandler()
+ if (isAppRecovering) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+ }
+ } else {
+ rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+ }
}
private synchronized void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
- boolean transferStateFromPreviousAttempt) {
- SchedulerApplication application =
+ boolean transferStateFromPreviousAttempt,
+ boolean isAttemptRecovering) {
+ SchedulerApplication<FiCaSchedulerApp> application =
applications.get(applicationAttemptId.getApplicationId());
CSQueue queue = (CSQueue) application.getQueue();
@@ -565,14 +681,22 @@ public class CapacityScheduler extends A
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user " + application.getUser() + " in queue "
+ queue.getQueueName());
- rmContext.getDispatcher().getEventHandler() .handle(
+ if (isAttemptRecovering) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(applicationAttemptId
+ + " is recovering. Skipping notifying ATTEMPT_ADDED");
+ }
+ } else {
+ rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(applicationAttemptId,
- RMAppAttemptEventType.ATTEMPT_ADDED));
+ RMAppAttemptEventType.ATTEMPT_ADDED));
+ }
}
private synchronized void doneApplication(ApplicationId applicationId,
RMAppState finalState) {
- SchedulerApplication application = applications.get(applicationId);
+ SchedulerApplication<FiCaSchedulerApp> application =
+ applications.get(applicationId);
if (application == null){
// The AppRemovedSchedulerEvent maybe sent on recovery for completed apps,
// ignore it.
@@ -597,7 +721,7 @@ public class CapacityScheduler extends A
" finalState=" + rmAppAttemptFinalState);
FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
- SchedulerApplication application =
+ SchedulerApplication<FiCaSchedulerApp> application =
applications.get(applicationAttemptId.getApplicationId());
if (application == null || attempt == null) {
@@ -659,25 +783,11 @@ public class CapacityScheduler extends A
// Sanity check
SchedulerUtils.normalizeRequests(
- ask, getResourceCalculator(), getClusterResources(),
+ ask, getResourceCalculator(), getClusterResource(),
getMinimumResourceCapability(), maximumAllocation);
// Release containers
- for (ContainerId releasedContainerId : release) {
- RMContainer rmContainer = getRMContainer(releasedContainerId);
- if (rmContainer == null) {
- RMAuditLogger.logFailure(application.getUser(),
- AuditConstants.RELEASE_CONTAINER,
- "Unauthorized access or invalid container", "CapacityScheduler",
- "Trying to release container not owned by app or with invalid id",
- application.getApplicationId(), releasedContainerId);
- }
- completedContainer(rmContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- releasedContainerId,
- SchedulerUtils.RELEASED_CONTAINER),
- RMContainerEventType.RELEASED);
- }
+ releaseContainers(release, application);
synchronized (application) {
@@ -757,7 +867,10 @@ public class CapacityScheduler extends A
FiCaSchedulerNode node = getNode(nm.getNodeID());
// Update resource if any change
- SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG);
+ if (SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource,
+ LOG)) {
+ root.updateClusterResource(clusterResource);
+ }
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
@@ -822,7 +935,7 @@ public class CapacityScheduler extends A
// Try to schedule more if there are no reservations to fulfill
if (node.getReservedContainer() == null) {
- if (Resources.greaterThanOrEqual(calculator, getClusterResources(),
+ if (Resources.greaterThanOrEqual(calculator, getClusterResource(),
node.getAvailableResource(), minimumAllocation)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
@@ -839,21 +952,6 @@ public class CapacityScheduler extends A
}
- private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
- // Get the application for the finished container
- FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
- if (application == null) {
- LOG.info("Unknown application "
- + containerId.getApplicationAttemptId().getApplicationId()
- + " launched container " + containerId + " on node: " + node);
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
- return;
- }
-
- application.containerLaunchedOnNode(containerId, node.getNodeID());
- }
-
@Override
public void handle(SchedulerEvent event) {
switch(event.getType()) {
@@ -861,6 +959,8 @@ public class CapacityScheduler extends A
{
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
addNode(nodeAddedEvent.getAddedRMNode());
+ recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
+ nodeAddedEvent.getAddedRMNode());
}
break;
case NODE_REMOVED:
@@ -883,7 +983,8 @@ public class CapacityScheduler extends A
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
- appAddedEvent.getQueue(), appAddedEvent.getUser());
+ appAddedEvent.getQueue(),
+ appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering());
}
break;
case APP_REMOVED:
@@ -898,7 +999,8 @@ public class CapacityScheduler extends A
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
- appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+ appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
+ appAttemptAddedEvent.getIsAttemptRecovering());
}
break;
case APP_ATTEMPT_REMOVED:
@@ -932,25 +1034,25 @@ public class CapacityScheduler extends A
usePortForNodeName));
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
root.updateClusterResource(clusterResource);
- ++numNodeManagers;
+ int numNodes = numNodeManagers.incrementAndGet();
LOG.info("Added node " + nodeManager.getNodeAddress() +
" clusterResource: " + clusterResource);
- if (scheduleAsynchronously && numNodeManagers == 1) {
+ if (scheduleAsynchronously && numNodes == 1) {
asyncSchedulerThread.beginSchedule();
}
}
private synchronized void removeNode(RMNode nodeInfo) {
- FiCaSchedulerNode node = this.nodes.get(nodeInfo.getNodeID());
+ FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID());
if (node == null) {
return;
}
Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
root.updateClusterResource(clusterResource);
- --numNodeManagers;
+ int numNodes = numNodeManagers.decrementAndGet();
- if (scheduleAsynchronously && numNodeManagers == 0) {
+ if (scheduleAsynchronously && numNodes == 0) {
asyncSchedulerThread.suspendSchedule();
}
@@ -980,7 +1082,8 @@ public class CapacityScheduler extends A
}
@Lock(CapacityScheduler.class)
- private synchronized void completedContainer(RMContainer rmContainer,
+ @Override
+ protected synchronized void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
if (rmContainer == null) {
LOG.info("Null container completed...");
@@ -1015,28 +1118,10 @@ public class CapacityScheduler extends A
@Lock(Lock.NoLock.class)
@VisibleForTesting
- public FiCaSchedulerApp getApplicationAttempt(
- ApplicationAttemptId applicationAttemptId) {
- SchedulerApplication app =
- applications.get(applicationAttemptId.getApplicationId());
- if (app != null) {
- return (FiCaSchedulerApp) app.getCurrentAppAttempt();
- }
- return null;
- }
-
- @Override
- public SchedulerAppReport getSchedulerAppInfo(
- ApplicationAttemptId applicationAttemptId) {
- FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
- return app == null ? null : new SchedulerAppReport(app);
- }
-
@Override
- public ApplicationResourceUsageReport getAppResourceUsageReport(
+ public FiCaSchedulerApp getApplicationAttempt(
ApplicationAttemptId applicationAttemptId) {
- FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
- return app == null ? null : app.getResourceUsageReport();
+ return super.getApplicationAttempt(applicationAttemptId);
}
@Lock(Lock.NoLock.class)
@@ -1048,24 +1133,6 @@ public class CapacityScheduler extends A
Map<NodeId, FiCaSchedulerNode> getAllNodes() {
return nodes;
}
-
- @Override
- public RMContainer getRMContainer(ContainerId containerId) {
- FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
- return (attempt == null) ? null : attempt.getRMContainer(containerId);
- }
-
- @VisibleForTesting
- public FiCaSchedulerApp getCurrentAttemptForContainer(
- ContainerId containerId) {
- SchedulerApplication app =
- applications.get(containerId.getApplicationAttemptId()
- .getApplicationId());
- if (app != null) {
- return (FiCaSchedulerApp) app.getCurrentAppAttempt();
- }
- return null;
- }
@Override
@Lock(Lock.NoLock.class)
@@ -1074,12 +1141,6 @@ public class CapacityScheduler extends A
}
@Override
- public SchedulerNodeReport getNodeReport(NodeId nodeId) {
- FiCaSchedulerNode node = getNode(nodeId);
- return node == null ? null : new SchedulerNodeReport(node);
- }
-
- @Override
public void dropContainerReservation(RMContainer container) {
if(LOG.isDebugEnabled()){
LOG.debug("DROP_RESERVATION:" + container.toString());
@@ -1105,14 +1166,13 @@ public class CapacityScheduler extends A
@Override
public void killContainer(RMContainer cont) {
- if(LOG.isDebugEnabled()){
+ if (LOG.isDebugEnabled()) {
LOG.debug("KILL_CONTAINER: container" + cont.toString());
}
- completedContainer(cont,
- SchedulerUtils.createPreemptedContainerStatus(
- cont.getContainerId(),"Container being forcibly preempted:"
- + cont.getContainerId()),
- RMContainerEventType.KILL);
+ recoverResourceRequestForContainer(cont);
+ completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus(
+ cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER),
+ RMContainerEventType.KILL);
}
@Override
@@ -1156,4 +1216,59 @@ public class CapacityScheduler extends A
throw new IOException(e);
}
}
+
+ @Override
+ public synchronized String moveApplication(ApplicationId appId,
+ String targetQueueName) throws YarnException {
+ FiCaSchedulerApp app =
+ getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0));
+ String sourceQueueName = app.getQueue().getQueueName();
+ LeafQueue source = getAndCheckLeafQueue(sourceQueueName);
+ LeafQueue dest = getAndCheckLeafQueue(targetQueueName);
+ // Validation check - ACLs, submission limits for user & queue
+ String user = app.getUser();
+ try {
+ dest.submitApplication(appId, user, targetQueueName);
+ } catch (AccessControlException e) {
+ throw new YarnException(e);
+ }
+ // Move all live containers
+ for (RMContainer rmContainer : app.getLiveContainers()) {
+ source.detachContainer(clusterResource, app, rmContainer);
+ // attach the Container to another queue
+ dest.attachContainer(clusterResource, app, rmContainer);
+ }
+ // Detach the application..
+ source.finishApplicationAttempt(app, sourceQueueName);
+ source.getParent().finishApplication(appId, app.getUser());
+ // Finish app & update metrics
+ app.move(dest);
+ // Submit to a new queue
+ dest.submitApplicationAttempt(app, user);
+ applications.get(appId).setQueue(dest);
+ LOG.info("App: " + app.getApplicationId() + " successfully moved from "
+ + sourceQueueName + " to: " + targetQueueName);
+ return targetQueueName;
+ }
+
+ /**
+ * Check that the String provided in input is the name of an existing,
+ * LeafQueue, if successful returns the queue.
+ *
+ * @param queue
+ * @return the LeafQueue
+ * @throws YarnException
+ */
+ private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
+ CSQueue ret = this.getQueue(queue);
+ if (ret == null) {
+ throw new YarnException("The specified Queue: " + queue
+ + " doesn't exist");
+ }
+ if (!(ret instanceof LeafQueue)) {
+ throw new YarnException("The specified Queue: " + queue
+ + " is not a Leaf Queue. Move is supported only for Leaf Queues.");
+ }
+ return (LeafQueue) ret;
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Tue Aug 19 23:49:39 2014
@@ -18,8 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -145,6 +144,44 @@ public class CapacitySchedulerConfigurat
@Private
public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false;
+
+ @Private
+ public static final String QUEUE_MAPPING = PREFIX + "queue-mappings";
+
+ @Private
+ public static final String ENABLE_QUEUE_MAPPING_OVERRIDE = QUEUE_MAPPING + "-override.enable";
+
+ @Private
+ public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false;
+
+ @Private
+ public static class QueueMapping {
+
+ public enum MappingType {
+
+ USER("u"),
+ GROUP("g");
+ private final String type;
+ private MappingType(String type) {
+ this.type = type;
+ }
+
+ public String toString() {
+ return type;
+ }
+
+ };
+
+ MappingType type;
+ String source;
+ String queue;
+
+ public QueueMapping(MappingType type, String source, String queue) {
+ this.type = type;
+ this.source = source;
+ this.queue = queue;
+ }
+ }
public CapacitySchedulerConfiguration() {
this(new Configuration());
@@ -378,4 +415,82 @@ public class CapacitySchedulerConfigurat
setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async);
}
+ public boolean getOverrideWithQueueMappings() {
+ return getBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE,
+ DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE);
+ }
+
+ /**
+ * Returns a collection of strings, trimming leading and trailing whitespeace
+ * on each value
+ *
+ * @param str
+ * String to parse
+ * @param delim
+ * delimiter to separate the values
+ * @return Collection of parsed elements.
+ */
+ private static Collection<String> getTrimmedStringCollection(String str,
+ String delim) {
+ List<String> values = new ArrayList<String>();
+ if (str == null)
+ return values;
+ StringTokenizer tokenizer = new StringTokenizer(str, delim);
+ while (tokenizer.hasMoreTokens()) {
+ String next = tokenizer.nextToken();
+ if (next == null || next.trim().isEmpty()) {
+ continue;
+ }
+ values.add(next.trim());
+ }
+ return values;
+ }
+
+ /**
+ * Get user/group mappings to queues.
+ *
+ * @return user/groups mappings or null on illegal configs
+ */
+ public List<QueueMapping> getQueueMappings() {
+ List<QueueMapping> mappings =
+ new ArrayList<CapacitySchedulerConfiguration.QueueMapping>();
+ Collection<String> mappingsString =
+ getTrimmedStringCollection(QUEUE_MAPPING);
+ for (String mappingValue : mappingsString) {
+ String[] mapping =
+ getTrimmedStringCollection(mappingValue, ":")
+ .toArray(new String[] {});
+ if (mapping.length != 3 || mapping[1].length() == 0
+ || mapping[2].length() == 0) {
+ throw new IllegalArgumentException(
+ "Illegal queue mapping " + mappingValue);
+ }
+
+ QueueMapping m;
+ try {
+ QueueMapping.MappingType mappingType;
+ if (mapping[0].equals("u")) {
+ mappingType = QueueMapping.MappingType.USER;
+ } else if (mapping[0].equals("g")) {
+ mappingType = QueueMapping.MappingType.GROUP;
+ } else {
+ throw new IllegalArgumentException(
+ "unknown mapping prefix " + mapping[0]);
+ }
+ m = new QueueMapping(
+ mappingType,
+ mapping[1],
+ mapping[2]);
+ } catch (Throwable t) {
+ throw new IllegalArgumentException(
+ "Illegal queue mapping " + mappingValue);
+ }
+
+ if (m != null) {
+ mappings.add(m);
+ }
+ }
+
+ return mappings;
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java Tue Aug 19 23:49:39 2014
@@ -43,7 +43,7 @@ public interface CapacitySchedulerContex
RMContext getRMContext();
- Resource getClusterResources();
+ Resource getClusterResource();
/**
* Get the yarn configuration.
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Tue Aug 19 23:49:39 2014
@@ -59,15 +59,17 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.annotations.VisibleForTesting;
+
@Private
@Unstable
public class LeafQueue implements CSQueue {
@@ -174,12 +176,12 @@ public class LeafQueue implements CSQueu
int maxActiveApplications =
CSQueueUtils.computeMaxActiveApplications(
resourceCalculator,
- cs.getClusterResources(), this.minimumAllocation,
+ cs.getClusterResource(), this.minimumAllocation,
maxAMResourcePerQueuePercent, absoluteMaxCapacity);
this.maxActiveAppsUsingAbsCap =
CSQueueUtils.computeMaxActiveApplications(
resourceCalculator,
- cs.getClusterResources(), this.minimumAllocation,
+ cs.getClusterResource(), this.minimumAllocation,
maxAMResourcePerQueuePercent, absoluteCapacity);
int maxActiveApplicationsPerUser =
CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveAppsUsingAbsCap, userLimit,
@@ -195,7 +197,7 @@ public class LeafQueue implements CSQueu
cs.getConfiguration().getAcls(getQueuePath());
setupQueueConfigs(
- cs.getClusterResources(),
+ cs.getClusterResource(),
capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity,
userLimit, userLimitFactor,
@@ -564,7 +566,8 @@ public class LeafQueue implements CSQueu
"numContainers=" + getNumContainers();
}
- private synchronized User getUser(String userName) {
+ @VisibleForTesting
+ public synchronized User getUser(String userName) {
User user = users.get(userName);
if (user == null) {
user = new User();
@@ -640,7 +643,10 @@ public class LeafQueue implements CSQueu
addApplicationAttempt(application, user);
}
- metrics.submitAppAttempt(userName);
+ // We don't want to update metrics for move app
+ if (application.isPending()) {
+ metrics.submitAppAttempt(userName);
+ }
getParent().submitApplicationAttempt(application, userName);
}
@@ -698,7 +704,6 @@ public class LeafQueue implements CSQueu
throw ace;
}
- metrics.submitApp(userName);
}
private synchronized void activateApplications() {
@@ -973,13 +978,18 @@ public class LeafQueue implements CSQueu
Resource userLimit = // User limit
computeUserLimit(application, clusterResource, required);
-
+
+ //Max avail capacity needs to take into account usage by ancestor-siblings
+ //which are greater than their base capacity, so we are interested in "max avail"
+ //capacity
+ float absoluteMaxAvailCapacity = CSQueueUtils.getAbsoluteMaxAvailCapacity(
+ resourceCalculator, clusterResource, this);
Resource queueMaxCap = // Queue Max-Capacity
Resources.multiplyAndNormalizeDown(
resourceCalculator,
clusterResource,
- absoluteMaxCapacity,
+ absoluteMaxAvailCapacity,
minimumAllocation);
Resource userConsumed = getUser(user).getConsumedResources();
@@ -1346,8 +1356,7 @@ public class LeafQueue implements CSQueu
}
// Inform the node
- node.allocateContainer(application.getApplicationId(),
- allocatedContainer);
+ node.allocateContainer(allocatedContainer);
LOG.info("assignedContainer" +
" application attempt=" + application.getApplicationAttemptId() +
@@ -1446,7 +1455,7 @@ public class LeafQueue implements CSQueu
}
synchronized void allocateResource(Resource clusterResource,
- FiCaSchedulerApp application, Resource resource) {
+ SchedulerApplicationAttempt application, Resource resource) {
// Update queue metrics
Resources.addTo(usedResources, resource);
CSQueueUtils.updateQueueStatistics(
@@ -1530,7 +1539,8 @@ public class LeafQueue implements CSQueu
return metrics;
}
- static class User {
+ @VisibleForTesting
+ public static class User {
Resource consumed = Resources.createResource(0, 0);
int pendingApplications = 0;
int activeApplications = 0;
@@ -1580,13 +1590,16 @@ public class LeafQueue implements CSQueu
@Override
public void recoverContainer(Resource clusterResource,
- FiCaSchedulerApp application, Container container) {
+ SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
// Careful! Locking order is important!
synchronized (this) {
- allocateResource(clusterResource, application, container.getResource());
+ allocateResource(clusterResource, attempt, rmContainer.getContainer()
+ .getResource());
}
- getParent().recoverContainer(clusterResource, application, container);
-
+ getParent().recoverContainer(clusterResource, attempt, rmContainer);
}
/**
@@ -1609,9 +1622,43 @@ public class LeafQueue implements CSQueu
@Override
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
+ for (FiCaSchedulerApp pendingApp : pendingApplications) {
+ apps.add(pendingApp.getApplicationAttemptId());
+ }
for (FiCaSchedulerApp app : activeApplications) {
apps.add(app.getApplicationAttemptId());
}
}
+ @Override
+ public void attachContainer(Resource clusterResource,
+ FiCaSchedulerApp application, RMContainer rmContainer) {
+ if (application != null) {
+ allocateResource(clusterResource, application, rmContainer.getContainer()
+ .getResource());
+ LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ + " resource=" + rmContainer.getContainer().getResource()
+ + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
+ + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
+ + usedResources + " cluster=" + clusterResource);
+ // Inform the parent queue
+ getParent().attachContainer(clusterResource, application, rmContainer);
+ }
+ }
+
+ @Override
+ public void detachContainer(Resource clusterResource,
+ FiCaSchedulerApp application, RMContainer rmContainer) {
+ if (application != null) {
+ releaseResource(clusterResource, application, rmContainer.getContainer()
+ .getResource());
+ LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ + " resource=" + rmContainer.getContainer().getResource()
+ + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
+ + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
+ + usedResources + " cluster=" + clusterResource);
+ // Inform the parent queue
+ getParent().detachContainer(clusterResource, application, rmContainer);
+ }
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Tue Aug 19 23:49:39 2014
@@ -38,7 +38,6 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -49,9 +48,11 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -143,7 +144,7 @@ public class ParentQueue implements CSQu
this.queueInfo.setQueueName(queueName);
this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
- setupQueueConfigs(cs.getClusterResources(),
+ setupQueueConfigs(cs.getClusterResource(),
capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity, state, acls);
@@ -770,13 +771,16 @@ public class ParentQueue implements CSQu
@Override
public void recoverContainer(Resource clusterResource,
- FiCaSchedulerApp application, Container container) {
+ SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
// Careful! Locking order is important!
synchronized (this) {
- allocateResource(clusterResource, container.getResource());
+ allocateResource(clusterResource,rmContainer.getContainer().getResource());
}
if (parent != null) {
- parent.recoverContainer(clusterResource, application, container);
+ parent.recoverContainer(clusterResource, attempt, rmContainer);
}
}
@@ -787,4 +791,37 @@ public class ParentQueue implements CSQu
queue.collectSchedulerApplications(apps);
}
}
+
+ @Override
+ public void attachContainer(Resource clusterResource,
+ FiCaSchedulerApp application, RMContainer rmContainer) {
+ if (application != null) {
+ allocateResource(clusterResource, rmContainer.getContainer()
+ .getResource());
+ LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
+ + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
+ + clusterResource);
+ // Inform the parent
+ if (parent != null) {
+ parent.attachContainer(clusterResource, application, rmContainer);
+ }
+ }
+ }
+
+ @Override
+ public void detachContainer(Resource clusterResource,
+ FiCaSchedulerApp application, RMContainer rmContainer) {
+ if (application != null) {
+ releaseResource(clusterResource, rmContainer.getContainer().getResource());
+ LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
+ + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
+ + clusterResource);
+ // Inform the parent
+ if (parent != null) {
+ parent.detachContainer(clusterResource, application, rmContainer);
+ }
+ }
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Tue Aug 19 23:49:39 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -77,6 +78,9 @@ public class FiCaSchedulerApp extends Sc
if (null == liveContainers.remove(rmContainer.getContainerId())) {
return false;
}
+
+ // Remove from the list of newly allocated containers if found
+ newlyAllocatedContainers.remove(rmContainer);
Container container = rmContainer.getContainer();
ContainerId containerId = container.getId();
@@ -129,8 +133,12 @@ public class FiCaSchedulerApp extends Sc
liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations
- appSchedulingInfo.allocate(type, node, priority, request, container);
+ List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+ type, node, priority, request, container);
Resources.addTo(currentConsumption, container.getResource());
+
+ // Update resource requests related to "request" and store in RMContainer
+ ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
// Inform the container
rmContainer.handle(
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Tue Aug 19 23:49:39 2014
@@ -18,248 +18,85 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.util.resource.Resources;
public class FiCaSchedulerNode extends SchedulerNode {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class);
- private static final RecordFactory recordFactory = RecordFactoryProvider
- .getRecordFactory(null);
-
- private Resource availableResource = recordFactory.newRecordInstance(Resource.class);
- private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
- private Resource totalResourceCapability;
-
- private volatile int numContainers;
-
- private RMContainer reservedContainer;
-
- /* set of containers that are allocated containers */
- private final Map<ContainerId, RMContainer> launchedContainers =
- new HashMap<ContainerId, RMContainer>();
-
- private final RMNode rmNode;
- private final String nodeName;
-
public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) {
- this.rmNode = node;
- this.availableResource.setMemory(node.getTotalCapability().getMemory());
- this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores());
- totalResourceCapability =
- Resource.newInstance(node.getTotalCapability().getMemory(), node
- .getTotalCapability().getVirtualCores());
- if (usePortForNodeName) {
- nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
- } else {
- nodeName = rmNode.getHostName();
- }
- }
-
- public RMNode getRMNode() {
- return this.rmNode;
- }
-
- public NodeId getNodeID() {
- return this.rmNode.getNodeID();
- }
-
- public String getHttpAddress() {
- return this.rmNode.getHttpAddress();
- }
-
- @Override
- public String getNodeName() {
- return nodeName;
- }
-
- @Override
- public String getRackName() {
- return this.rmNode.getRackName();
- }
-
- /**
- * The Scheduler has allocated containers on this node to the
- * given application.
- *
- * @param applicationId application
- * @param rmContainer allocated container
- */
- public synchronized void allocateContainer(ApplicationId applicationId,
- RMContainer rmContainer) {
- Container container = rmContainer.getContainer();
- deductAvailableResource(container.getResource());
- ++numContainers;
-
- launchedContainers.put(container.getId(), rmContainer);
-
- LOG.info("Assigned container " + container.getId() +
- " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
- ", which currently has " + numContainers + " containers, " +
- getUsedResource() + " used and " +
- getAvailableResource() + " available");
- }
-
- @Override
- public synchronized Resource getAvailableResource() {
- return this.availableResource;
- }
-
- @Override
- public synchronized Resource getUsedResource() {
- return this.usedResource;
- }
-
- @Override
- public Resource getTotalResource() {
- return this.totalResourceCapability;
- }
-
- private synchronized boolean isValidContainer(Container c) {
- if (launchedContainers.containsKey(c.getId()))
- return true;
- return false;
- }
-
- private synchronized void updateResource(Container container) {
- addAvailableResource(container.getResource());
- --numContainers;
- }
-
- /**
- * Release an allocated container on this node.
- * @param container container to be released
- */
- public synchronized void releaseContainer(Container container) {
- if (!isValidContainer(container)) {
- LOG.error("Invalid container released " + container);
- return;
- }
-
- /* remove the containers from the nodemanger */
- if (null != launchedContainers.remove(container.getId())) {
- updateResource(container);
- }
-
- LOG.info("Released container " + container.getId() +
- " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
- ", which currently has " + numContainers + " containers, " +
- getUsedResource() + " used and " + getAvailableResource()
- + " available" + ", release resources=" + true);
- }
-
-
- private synchronized void addAvailableResource(Resource resource) {
- if (resource == null) {
- LOG.error("Invalid resource addition of null resource for "
- + rmNode.getNodeAddress());
- return;
- }
- Resources.addTo(availableResource, resource);
- Resources.subtractFrom(usedResource, resource);
- }
-
- private synchronized void deductAvailableResource(Resource resource) {
- if (resource == null) {
- LOG.error("Invalid deduction of null resource for "
- + rmNode.getNodeAddress());
- return;
- }
- Resources.subtractFrom(availableResource, resource);
- Resources.addTo(usedResource, resource);
+ super(node, usePortForNodeName);
}
@Override
- public String toString() {
- return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +
- " available=" + getAvailableResource().getMemory() +
- " used=" + getUsedResource().getMemory();
- }
-
- @Override
- public int getNumContainers() {
- return numContainers;
- }
-
- public synchronized List<RMContainer> getRunningContainers() {
- return new ArrayList<RMContainer>(launchedContainers.values());
- }
-
public synchronized void reserveResource(
- SchedulerApplicationAttempt application, Priority priority,
- RMContainer reservedContainer) {
+ SchedulerApplicationAttempt application, Priority priority,
+ RMContainer container) {
// Check if it's already reserved
- if (this.reservedContainer != null) {
+ RMContainer reservedContainer = getReservedContainer();
+ if (reservedContainer != null) {
// Sanity check
- if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) {
+ if (!container.getContainer().getNodeId().equals(getNodeID())) {
throw new IllegalStateException("Trying to reserve" +
- " container " + reservedContainer +
- " on node " + reservedContainer.getReservedNode() +
- " when currently" + " reserved resource " + this.reservedContainer +
- " on node " + this.reservedContainer.getReservedNode());
+ " container " + container +
+ " on node " + container.getReservedNode() +
+ " when currently" + " reserved resource " + reservedContainer +
+ " on node " + reservedContainer.getReservedNode());
}
// Cannot reserve more than one application attempt on a given node!
// Reservation is still against attempt.
- if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals(
- reservedContainer.getContainer().getId().getApplicationAttemptId())) {
+ if (!reservedContainer.getContainer().getId().getApplicationAttemptId()
+ .equals(container.getContainer().getId().getApplicationAttemptId())) {
throw new IllegalStateException("Trying to reserve" +
- " container " + reservedContainer +
+ " container " + container +
" for application " + application.getApplicationAttemptId() +
" when currently" +
- " reserved container " + this.reservedContainer +
+ " reserved container " + reservedContainer +
" on node " + this);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Updated reserved container "
- + reservedContainer.getContainer().getId() + " on node " + this
+ + container.getContainer().getId() + " on node " + this
+ " for application attempt "
+ application.getApplicationAttemptId());
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Reserved container "
- + reservedContainer.getContainer().getId() + " on node " + this
+ + container.getContainer().getId() + " on node " + this
+ " for application attempt "
+ application.getApplicationAttemptId());
}
}
- this.reservedContainer = reservedContainer;
+ setReservedContainer(container);
}
+ @Override
public synchronized void unreserveResource(
SchedulerApplicationAttempt application) {
-
+
// adding NP checks as this can now be called for preemption
- if (reservedContainer != null
- && reservedContainer.getContainer() != null
- && reservedContainer.getContainer().getId() != null
- && reservedContainer.getContainer().getId().getApplicationAttemptId() != null) {
+ if (getReservedContainer() != null
+ && getReservedContainer().getContainer() != null
+ && getReservedContainer().getContainer().getId() != null
+ && getReservedContainer().getContainer().getId()
+ .getApplicationAttemptId() != null) {
// Cannot unreserve for wrong application...
ApplicationAttemptId reservedApplication =
- reservedContainer.getContainer().getId().getApplicationAttemptId();
+ getReservedContainer().getContainer().getId()
+ .getApplicationAttemptId();
if (!reservedApplication.equals(
application.getApplicationAttemptId())) {
throw new IllegalStateException("Trying to unreserve " +
@@ -269,17 +106,6 @@ public class FiCaSchedulerNode extends S
" on node " + this);
}
}
- reservedContainer = null;
- }
-
- public synchronized RMContainer getReservedContainer() {
- return reservedContainer;
+ setReservedContainer(null);
}
-
- @Override
- public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) {
- // we can only adjust available resource if total resource is changed.
- Resources.addTo(this.availableResource, deltaResource);
- }
-
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java Tue Aug 19 23:49:39 2014
@@ -25,13 +25,20 @@ public class AppAddedSchedulerEvent exte
private final ApplicationId applicationId;
private final String queue;
private final String user;
+ private final boolean isAppRecovering;
public AppAddedSchedulerEvent(
ApplicationId applicationId, String queue, String user) {
+ this(applicationId, queue, user, false);
+ }
+
+ public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
+ String user, boolean isAppRecovering) {
super(SchedulerEventType.APP_ADDED);
this.applicationId = applicationId;
this.queue = queue;
this.user = user;
+ this.isAppRecovering = isAppRecovering;
}
public ApplicationId getApplicationId() {
@@ -46,4 +53,7 @@ public class AppAddedSchedulerEvent exte
return user;
}
+ public boolean getIsAppRecovering() {
+ return isAppRecovering;
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Tue Aug 19 23:49:39 2014
@@ -24,13 +24,22 @@ public class AppAttemptAddedSchedulerEve
private final ApplicationAttemptId applicationAttemptId;
private final boolean transferStateFromPreviousAttempt;
+ private final boolean isAttemptRecovering;
public AppAttemptAddedSchedulerEvent(
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt) {
+ this(applicationAttemptId, transferStateFromPreviousAttempt, false);
+ }
+
+ public AppAttemptAddedSchedulerEvent(
+ ApplicationAttemptId applicationAttemptId,
+ boolean transferStateFromPreviousAttempt,
+ boolean isAttemptRecovering) {
super(SchedulerEventType.APP_ATTEMPT_ADDED);
this.applicationAttemptId = applicationAttemptId;
this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
+ this.isAttemptRecovering = isAttemptRecovering;
}
public ApplicationAttemptId getApplicationAttemptId() {
@@ -40,4 +49,8 @@ public class AppAttemptAddedSchedulerEve
public boolean getTransferStateFromPreviousAttempt() {
return transferStateFromPreviousAttempt;
}
+
+ public boolean getIsAttemptRecovering() {
+ return isAttemptRecovering;
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java Tue Aug 19 23:49:39 2014
@@ -18,19 +18,34 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+import java.util.List;
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class NodeAddedSchedulerEvent extends SchedulerEvent {
private final RMNode rmNode;
+ private final List<NMContainerStatus> containerReports;
public NodeAddedSchedulerEvent(RMNode rmNode) {
super(SchedulerEventType.NODE_ADDED);
this.rmNode = rmNode;
+ this.containerReports = null;
+ }
+
+ public NodeAddedSchedulerEvent(RMNode rmNode,
+ List<NMContainerStatus> containerReports) {
+ super(SchedulerEventType.NODE_ADDED);
+ this.rmNode = rmNode;
+ this.containerReports = containerReports;
}
public RMNode getAddedRMNode() {
return rmNode;
}
+ public List<NMContainerStatus> getContainerReports() {
+ return containerReports;
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java Tue Aug 19 23:49:39 2014
@@ -53,6 +53,10 @@ public class AllocationConfiguration {
private final int userMaxAppsDefault;
private final int queueMaxAppsDefault;
+ // Maximum resource share for each leaf queue that can be used to run AMs
+ final Map<String, Float> queueMaxAMShares;
+ private final float queueMaxAMShareDefault;
+
// ACL's for each queue. Only specifies non-default ACL's from configuration.
private final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
@@ -77,26 +81,32 @@ public class AllocationConfiguration {
@VisibleForTesting
QueuePlacementPolicy placementPolicy;
+ //Configured queues in the alloc xml
@VisibleForTesting
- Set<String> queueNames;
+ Map<FSQueueType, Set<String>> configuredQueues;
- public AllocationConfiguration(Map<String, Resource> minQueueResources,
- Map<String, Resource> maxQueueResources,
+ public AllocationConfiguration(Map<String, Resource> minQueueResources,
+ Map<String, Resource> maxQueueResources,
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
- Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
- int queueMaxAppsDefault, Map<String, SchedulingPolicy> schedulingPolicies,
+ Map<String, ResourceWeights> queueWeights,
+ Map<String, Float> queueMaxAMShares, int userMaxAppsDefault,
+ int queueMaxAppsDefault, float queueMaxAMShareDefault,
+ Map<String, SchedulingPolicy> schedulingPolicies,
SchedulingPolicy defaultSchedulingPolicy,
- Map<String, Long> minSharePreemptionTimeouts,
+ Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout,
- QueuePlacementPolicy placementPolicy, Set<String> queueNames) {
+ QueuePlacementPolicy placementPolicy,
+ Map<FSQueueType, Set<String>> configuredQueues) {
this.minQueueResources = minQueueResources;
this.maxQueueResources = maxQueueResources;
this.queueMaxApps = queueMaxApps;
this.userMaxApps = userMaxApps;
+ this.queueMaxAMShares = queueMaxAMShares;
this.queueWeights = queueWeights;
this.userMaxAppsDefault = userMaxAppsDefault;
this.queueMaxAppsDefault = queueMaxAppsDefault;
+ this.queueMaxAMShareDefault = queueMaxAMShareDefault;
this.defaultSchedulingPolicy = defaultSchedulingPolicy;
this.schedulingPolicies = schedulingPolicies;
this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
@@ -104,7 +114,7 @@ public class AllocationConfiguration {
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
this.placementPolicy = placementPolicy;
- this.queueNames = queueNames;
+ this.configuredQueues = configuredQueues;
}
public AllocationConfiguration(Configuration conf) {
@@ -113,17 +123,22 @@ public class AllocationConfiguration {
queueWeights = new HashMap<String, ResourceWeights>();
queueMaxApps = new HashMap<String, Integer>();
userMaxApps = new HashMap<String, Integer>();
+ queueMaxAMShares = new HashMap<String, Float>();
userMaxAppsDefault = Integer.MAX_VALUE;
queueMaxAppsDefault = Integer.MAX_VALUE;
+ queueMaxAMShareDefault = -1.0f;
queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
minSharePreemptionTimeouts = new HashMap<String, Long>();
defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
fairSharePreemptionTimeout = Long.MAX_VALUE;
schedulingPolicies = new HashMap<String, SchedulingPolicy>();
defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY;
+ configuredQueues = new HashMap<FSQueueType, Set<String>>();
+ for (FSQueueType queueType : FSQueueType.values()) {
+ configuredQueues.put(queueType, new HashSet<String>());
+ }
placementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
- new HashSet<String>());
- queueNames = new HashSet<String>();
+ configuredQueues);
}
/**
@@ -178,6 +193,11 @@ public class AllocationConfiguration {
return (maxApps == null) ? queueMaxAppsDefault : maxApps;
}
+ public float getQueueMaxAMShare(String queue) {
+ Float maxAMShare = queueMaxAMShares.get(queue);
+ return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare;
+ }
+
/**
* Get the minimum resource allocation for the given queue.
* @return the cap set on this queue, or 0 if not set.
@@ -221,8 +241,8 @@ public class AllocationConfiguration {
return defaultSchedulingPolicy;
}
- public Set<String> getQueueNames() {
- return queueNames;
+ public Map<FSQueueType, Set<String>> getConfiguredQueues() {
+ return configuredQueues;
}
public QueuePlacementPolicy getPlacementPolicy() {