You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sz...@apache.org on 2014/01/03 08:27:13 UTC
svn commit: r1555021 [4/7] - in
/hadoop/common/branches/HDFS-5535/hadoop-yarn-project: ./ hadoop-yarn/
hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/
hadoop-yarn/hadoop-yarn-api/src/main/j...
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java Fri Jan 3 07:26:52 2014
@@ -36,7 +36,7 @@ public class SchedulerAppReport {
private final Collection<RMContainer> reserved;
private final boolean pending;
- public SchedulerAppReport(SchedulerApplication app) {
+ public SchedulerAppReport(SchedulerApplicationAttempt app) {
this.live = app.getLiveContainers();
this.reserved = app.getReservedContainers();
this.pending = app.isPending();
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java Fri Jan 3 07:26:52 2014
@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
public class SchedulerAppUtils {
- public static boolean isBlacklisted(SchedulerApplication application,
+ public static boolean isBlacklisted(SchedulerApplicationAttempt application,
SchedulerNode node, Log LOG) {
if (application.isBlacklisted(node.getNodeName())) {
if (LOG.isDebugEnabled()) {
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Fri Jan 3 07:26:52 2014
@@ -17,393 +17,26 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
-/**
- * Represents an application attempt from the viewpoint of the scheduler.
- * Each running app attempt in the RM corresponds to one instance
- * of this class.
- */
@Private
@Unstable
-public abstract class SchedulerApplication {
-
- private static final Log LOG = LogFactory.getLog(SchedulerApplication.class);
-
- protected final AppSchedulingInfo appSchedulingInfo;
-
- protected final Map<ContainerId, RMContainer> liveContainers =
- new HashMap<ContainerId, RMContainer>();
- protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
- new HashMap<Priority, Map<NodeId, RMContainer>>();
-
- private final Multiset<Priority> reReservations = HashMultiset.create();
-
- protected final Resource currentReservation = Resource.newInstance(0, 0);
- private Resource resourceLimit = Resource.newInstance(0, 0);
- protected final Resource currentConsumption = Resource.newInstance(0, 0);
+public class SchedulerApplication {
- protected List<RMContainer> newlyAllocatedContainers =
- new ArrayList<RMContainer>();
+ private final Queue queue;
+ private final String user;
- /**
- * Count how many times the application has been given an opportunity
- * to schedule a task at each priority. Each time the scheduler
- * asks the application for a task at this priority, it is incremented,
- * and each time the application successfully schedules a task, it
- * is reset to 0.
- */
- Multiset<Priority> schedulingOpportunities = HashMultiset.create();
-
- // Time of the last container scheduled at the current allowed level
- protected Map<Priority, Long> lastScheduledContainer =
- new HashMap<Priority, Long>();
-
- protected final Queue queue;
- protected boolean isStopped = false;
-
- protected final RMContext rmContext;
-
- public SchedulerApplication(ApplicationAttemptId applicationAttemptId,
- String user, Queue queue, ActiveUsersManager activeUsersManager,
- RMContext rmContext) {
- this.rmContext = rmContext;
- this.appSchedulingInfo =
- new AppSchedulingInfo(applicationAttemptId, user, queue,
- activeUsersManager);
+ public SchedulerApplication(Queue queue, String user) {
this.queue = queue;
- }
-
- /**
- * Get the live containers of the application.
- * @return live containers of the application
- */
- public synchronized Collection<RMContainer> getLiveContainers() {
- return new ArrayList<RMContainer>(liveContainers.values());
- }
-
- /**
- * Is this application pending?
- * @return true if it is else false.
- */
- public boolean isPending() {
- return appSchedulingInfo.isPending();
- }
-
- /**
- * Get {@link ApplicationAttemptId} of the application master.
- * @return <code>ApplicationAttemptId</code> of the application master
- */
- public ApplicationAttemptId getApplicationAttemptId() {
- return appSchedulingInfo.getApplicationAttemptId();
- }
-
- public ApplicationId getApplicationId() {
- return appSchedulingInfo.getApplicationId();
- }
-
- public String getUser() {
- return appSchedulingInfo.getUser();
- }
-
- public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
- return appSchedulingInfo.getResourceRequests(priority);
- }
-
- public int getNewContainerId() {
- return appSchedulingInfo.getNewContainerId();
- }
-
- public Collection<Priority> getPriorities() {
- return appSchedulingInfo.getPriorities();
- }
-
- public ResourceRequest getResourceRequest(Priority priority, String resourceName) {
- return this.appSchedulingInfo.getResourceRequest(priority, resourceName);
- }
-
- public synchronized int getTotalRequiredResources(Priority priority) {
- return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
- }
-
- public Resource getResource(Priority priority) {
- return appSchedulingInfo.getResource(priority);
- }
-
- public String getQueueName() {
- return appSchedulingInfo.getQueueName();
- }
-
- public synchronized RMContainer getRMContainer(ContainerId id) {
- return liveContainers.get(id);
- }
-
- protected synchronized void resetReReservations(Priority priority) {
- reReservations.setCount(priority, 0);
- }
-
- protected synchronized void addReReservation(Priority priority) {
- reReservations.add(priority);
- }
-
- public synchronized int getReReservations(Priority priority) {
- return reReservations.count(priority);
+ this.user = user;
}
- /**
- * Get total current reservations.
- * Used only by unit tests
- * @return total current reservations
- */
- @Stable
- @Private
- public synchronized Resource getCurrentReservation() {
- return currentReservation;
- }
-
public Queue getQueue() {
return queue;
}
-
- public synchronized void updateResourceRequests(
- List<ResourceRequest> requests) {
- if (!isStopped) {
- appSchedulingInfo.updateResourceRequests(requests);
- }
- }
-
- public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
- // Cleanup all scheduling information
- isStopped = true;
- appSchedulingInfo.stop(rmAppAttemptFinalState);
- }
-
- public synchronized boolean isStopped() {
- return isStopped;
- }
-
- /**
- * Get the list of reserved containers
- * @return All of the reserved containers.
- */
- public synchronized List<RMContainer> getReservedContainers() {
- List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
- for (Map.Entry<Priority, Map<NodeId, RMContainer>> e :
- this.reservedContainers.entrySet()) {
- reservedContainers.addAll(e.getValue().values());
- }
- return reservedContainers;
- }
-
- public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
- RMContainer rmContainer, Container container) {
- // Create RMContainer if necessary
- if (rmContainer == null) {
- rmContainer =
- new RMContainerImpl(container, getApplicationAttemptId(),
- node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
- rmContext.getContainerAllocationExpirer());
-
- Resources.addTo(currentReservation, container.getResource());
-
- // Reset the re-reservation count
- resetReReservations(priority);
- } else {
- // Note down the re-reservation
- addReReservation(priority);
- }
- rmContainer.handle(new RMContainerReservedEvent(container.getId(),
- container.getResource(), node.getNodeID(), priority));
-
- Map<NodeId, RMContainer> reservedContainers =
- this.reservedContainers.get(priority);
- if (reservedContainers == null) {
- reservedContainers = new HashMap<NodeId, RMContainer>();
- this.reservedContainers.put(priority, reservedContainers);
- }
- reservedContainers.put(node.getNodeID(), rmContainer);
-
- LOG.info("Application " + getApplicationId()
- + " reserved container " + rmContainer
- + " on node " + node + ", currently has " + reservedContainers.size()
- + " at priority " + priority
- + "; currentReservation " + currentReservation.getMemory());
-
- return rmContainer;
- }
-
- /**
- * Has the application reserved the given <code>node</code> at the
- * given <code>priority</code>?
- * @param node node to be checked
- * @param priority priority of reserved container
- * @return true is reserved, false if not
- */
- public synchronized boolean isReserved(SchedulerNode node, Priority priority) {
- Map<NodeId, RMContainer> reservedContainers =
- this.reservedContainers.get(priority);
- if (reservedContainers != null) {
- return reservedContainers.containsKey(node.getNodeID());
- }
- return false;
- }
-
- public synchronized void setHeadroom(Resource globalLimit) {
- this.resourceLimit = globalLimit;
- }
-
- /**
- * Get available headroom in terms of resources for the application's user.
- * @return available resource headroom
- */
- public synchronized Resource getHeadroom() {
- // Corner case to deal with applications being slightly over-limit
- if (resourceLimit.getMemory() < 0) {
- resourceLimit.setMemory(0);
- }
-
- return resourceLimit;
- }
-
- public synchronized int getNumReservedContainers(Priority priority) {
- Map<NodeId, RMContainer> reservedContainers =
- this.reservedContainers.get(priority);
- return (reservedContainers == null) ? 0 : reservedContainers.size();
- }
-
- @SuppressWarnings("unchecked")
- public synchronized void containerLaunchedOnNode(ContainerId containerId,
- NodeId nodeId) {
- // Inform the container
- RMContainer rmContainer = getRMContainer(containerId);
- if (rmContainer == null) {
- // Some unknown container sneaked into the system. Kill it.
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
- return;
- }
-
- rmContainer.handle(new RMContainerEvent(containerId,
- RMContainerEventType.LAUNCHED));
- }
-
- public synchronized void showRequests() {
- if (LOG.isDebugEnabled()) {
- for (Priority priority : getPriorities()) {
- Map<String, ResourceRequest> requests = getResourceRequests(priority);
- if (requests != null) {
- LOG.debug("showRequests:" + " application=" + getApplicationId() +
- " headRoom=" + getHeadroom() +
- " currentConsumption=" + currentConsumption.getMemory());
- for (ResourceRequest request : requests.values()) {
- LOG.debug("showRequests:" + " application=" + getApplicationId()
- + " request=" + request);
- }
- }
- }
- }
- }
-
- public Resource getCurrentConsumption() {
- return currentConsumption;
- }
-
- public synchronized List<Container> pullNewlyAllocatedContainers() {
- List<Container> returnContainerList = new ArrayList<Container>(
- newlyAllocatedContainers.size());
- for (RMContainer rmContainer : newlyAllocatedContainers) {
- rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
- RMContainerEventType.ACQUIRED));
- returnContainerList.add(rmContainer.getContainer());
- }
- newlyAllocatedContainers.clear();
- return returnContainerList;
- }
- public synchronized void updateBlacklist(
- List<String> blacklistAdditions, List<String> blacklistRemovals) {
- if (!isStopped) {
- this.appSchedulingInfo.updateBlacklist(
- blacklistAdditions, blacklistRemovals);
- }
- }
-
- public boolean isBlacklisted(String resourceName) {
- return this.appSchedulingInfo.isBlacklisted(resourceName);
- }
-
- public synchronized void addSchedulingOpportunity(Priority priority) {
- schedulingOpportunities.setCount(priority,
- schedulingOpportunities.count(priority) + 1);
- }
-
- public synchronized void subtractSchedulingOpportunity(Priority priority) {
- int count = schedulingOpportunities.count(priority) - 1;
- this.schedulingOpportunities.setCount(priority, Math.max(count, 0));
- }
-
- /**
- * Return the number of times the application has been given an opportunity
- * to schedule a task at the given priority since the last time it
- * successfully did so.
- */
- public synchronized int getSchedulingOpportunities(Priority priority) {
- return schedulingOpportunities.count(priority);
- }
-
- /**
- * Should be called when an application has successfully scheduled a container,
- * or when the scheduling locality threshold is relaxed.
- * Reset various internal counters which affect delay scheduling
- *
- * @param priority The priority of the container scheduled.
- */
- public synchronized void resetSchedulingOpportunities(Priority priority) {
- resetSchedulingOpportunities(priority, System.currentTimeMillis());
- }
- // used for continuous scheduling
- public synchronized void resetSchedulingOpportunities(Priority priority,
- long currentTimeMs) {
- lastScheduledContainer.put(priority, currentTimeMs);
- schedulingOpportunities.setCount(priority, 0);
- }
-
- public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
- return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
- reservedContainers.size(), Resources.clone(currentConsumption),
- Resources.clone(currentReservation),
- Resources.add(currentConsumption, currentReservation));
+ public String getUser() {
+ return user;
}
-
}
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Fri Jan 3 07:26:52 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -35,7 +36,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -155,21 +155,32 @@ extends org.apache.hadoop.yarn.server.re
/**
* Submit a new application to the queue.
- * @param application application being submitted
+ * @param applicationId the applicationId of the application being submitted
* @param user user who submitted the application
* @param queue queue to which the application is submitted
*/
- public void submitApplication(FiCaSchedulerApp application, String user,
- String queue)
- throws AccessControlException;
-
+ public void submitApplication(ApplicationId applicationId, String user,
+ String queue) throws AccessControlException;
+
+ /**
+ * Submit an application attempt to the queue.
+ */
+ public void submitApplicationAttempt(FiCaSchedulerApp application,
+ String userName);
+
/**
* An application submitted to this queue has finished.
- * @param application
- * @param queue application queue
+ * @param applicationId
+ * @param user user who submitted the application
*/
- public void finishApplication(FiCaSchedulerApp application, String queue);
-
+ public void finishApplication(ApplicationId applicationId, String user);
+
+ /**
+ * An application attempt submitted to this queue has finished.
+ */
+ public void finishApplicationAttempt(FiCaSchedulerApp application,
+ String queue);
+
/**
* Assign containers to applications in the queue or it's children (if any).
* @param clusterResource the resource of the cluster.
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Fri Jan 3 07:26:52 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -53,10 +54,13 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -65,13 +69,15 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -185,7 +191,11 @@ public class CapacityScheduler
private Resource maximumAllocation;
@VisibleForTesting
- protected Map<ApplicationAttemptId, FiCaSchedulerApp> applications =
+ protected Map<ApplicationId, SchedulerApplication> applications =
+ new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
+
+ @VisibleForTesting
+ protected Map<ApplicationAttemptId, FiCaSchedulerApp> appAttempts =
new ConcurrentHashMap<ApplicationAttemptId, FiCaSchedulerApp>();
private boolean initialized = false;
@@ -415,61 +425,84 @@ public class CapacityScheduler
synchronized CSQueue getQueue(String queueName) {
return queues.get(queueName);
}
-
- private synchronized void
- addApplication(ApplicationAttemptId applicationAttemptId,
- String queueName, String user) {
- // Sanity checks
+ private synchronized void addApplication(ApplicationId applicationId,
+ String queueName, String user) {
+ // santiy checks.
CSQueue queue = getQueue(queueName);
if (queue == null) {
- String message = "Application " + applicationAttemptId +
+ String message = "Application " + applicationId +
" submitted by user " + user + " to unknown queue: " + queueName;
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptRejectedEvent(applicationAttemptId, message));
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppRejectedEvent(applicationId, message));
return;
}
if (!(queue instanceof LeafQueue)) {
- String message = "Application " + applicationAttemptId +
+ String message = "Application " + applicationId +
" submitted by user " + user + " to non-leaf queue: " + queueName;
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptRejectedEvent(applicationAttemptId, message));
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppRejectedEvent(applicationId, message));
return;
}
-
- // TODO: Fix store
- FiCaSchedulerApp SchedulerApp =
- new FiCaSchedulerApp(applicationAttemptId, user, queue,
- queue.getActiveUsersManager(), rmContext);
-
// Submit to the queue
try {
- queue.submitApplication(SchedulerApp, user, queueName);
+ queue.submitApplication(applicationId, user, queueName);
} catch (AccessControlException ace) {
- LOG.info("Failed to submit application " + applicationAttemptId +
- " to queue " + queueName + " from user " + user, ace);
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptRejectedEvent(applicationAttemptId,
- ace.toString()));
+ LOG.info("Failed to submit application " + applicationId + " to queue "
+ + queueName + " from user " + user, ace);
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppRejectedEvent(applicationId, ace.toString()));
return;
}
+ SchedulerApplication application =
+ new SchedulerApplication(queue, user);
+ applications.put(applicationId, application);
+ LOG.info("Accepted application " + applicationId + " from user: " + user
+ + ", in queue: " + queueName);
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+ }
- applications.put(applicationAttemptId, SchedulerApp);
-
- LOG.info("Application Submission: " + applicationAttemptId +
- ", user: " + user +
- " queue: " + queue +
- ", currently active: " + applications.size());
-
+ private synchronized void addApplicationAttempt(
+ ApplicationAttemptId applicationAttemptId) {
+ SchedulerApplication application =
+ applications.get(applicationAttemptId.getApplicationId());
+ CSQueue queue = (CSQueue) application.getQueue();
+
+ FiCaSchedulerApp SchedulerApp =
+ new FiCaSchedulerApp(applicationAttemptId, application.getUser(),
+ queue, queue.getActiveUsersManager(), rmContext);
+ appAttempts.put(applicationAttemptId, SchedulerApp);
+ queue.submitApplicationAttempt(SchedulerApp, application.getUser());
+ LOG.info("Added Application Attempt " + applicationAttemptId
+ + " to scheduler from user " + application.getUser() + " in queue "
+ + queue.getQueueName());
rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptEvent(applicationAttemptId,
- RMAppAttemptEventType.APP_ACCEPTED));
+ new RMAppAttemptEvent(applicationAttemptId,
+ RMAppAttemptEventType.ATTEMPT_ADDED));
}
- private synchronized void doneApplication(
+ private synchronized void doneApplication(ApplicationId applicationId,
+ RMAppState finalState) {
+ SchedulerApplication application = applications.get(applicationId);
+ if (application == null){
+ // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps.
+ return;
+ }
+ CSQueue queue = (CSQueue) application.getQueue();
+ if (!(queue instanceof LeafQueue)) {
+ LOG.error("Cannot finish application " + "from non-leaf queue: "
+ + queue.getQueueName());
+ } else {
+ queue.finishApplication(applicationId, application.getUser());
+ }
+ applications.remove(applicationId);
+ }
+
+ private synchronized void doneApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState) {
- LOG.info("Application " + applicationAttemptId + " is done." +
+ LOG.info("Application Attempt " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
FiCaSchedulerApp application = getApplication(applicationAttemptId);
@@ -509,11 +542,11 @@ public class CapacityScheduler
LOG.error("Cannot finish application " + "from non-leaf queue: "
+ queueName);
} else {
- queue.finishApplication(application, queue.getQueueName());
+ queue.finishApplicationAttempt(application, queue.getQueueName());
}
// Remove from our data-structure
- applications.remove(applicationAttemptId);
+ appAttempts.remove(applicationAttemptId);
}
private static final Allocation EMPTY_ALLOCATION =
@@ -742,16 +775,31 @@ public class CapacityScheduler
break;
case APP_ADDED:
{
- AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
- addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
- .getQueue(), appAddedEvent.getUser());
+ AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
+ addApplication(appAddedEvent.getApplicationId(),
+ appAddedEvent.getQueue(), appAddedEvent.getUser());
}
break;
case APP_REMOVED:
{
AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
- doneApplication(appRemovedEvent.getApplicationAttemptID(),
- appRemovedEvent.getFinalAttemptState());
+ doneApplication(appRemovedEvent.getApplicationID(),
+ appRemovedEvent.getFinalState());
+ }
+ break;
+ case APP_ATTEMPT_ADDED:
+ {
+ AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
+ (AppAttemptAddedSchedulerEvent) event;
+ addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
+ }
+ break;
+ case APP_ATTEMPT_REMOVED:
+ {
+ AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
+ (AppAttemptRemovedSchedulerEvent) event;
+ doneApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
+ appAttemptRemovedEvent.getFinalAttemptState());
}
break;
case CONTAINER_EXPIRED:
@@ -852,7 +900,7 @@ public class CapacityScheduler
@Lock(Lock.NoLock.class)
FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
- return applications.get(applicationAttemptId);
+ return appAttempts.get(applicationAttemptId);
}
@Override
@@ -910,7 +958,7 @@ public class CapacityScheduler
LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() +
" container: " + cont.toString());
}
- FiCaSchedulerApp app = applications.get(aid);
+ FiCaSchedulerApp app = appAttempts.get(aid);
if (app != null) {
app.addPreemptContainer(cont.getContainerId());
}
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Fri Jan 3 07:26:52 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -59,7 +60,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -99,7 +99,7 @@ public class LeafQueue implements CSQueu
private volatile int numContainers;
Set<FiCaSchedulerApp> activeApplications;
- Map<ApplicationAttemptId, FiCaSchedulerApp> applicationsMap =
+ Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
Set<FiCaSchedulerApp> pendingApplications;
@@ -635,7 +635,22 @@ public class LeafQueue implements CSQueu
}
@Override
- public void submitApplication(FiCaSchedulerApp application, String userName,
+ public void submitApplicationAttempt(FiCaSchedulerApp application,
+ String userName) {
+ // Careful! Locking order is important!
+ synchronized (this) {
+ User user = getUser(userName);
+ // Add the attempt to our data-structures
+ addApplicationAttempt(application, user);
+ }
+
+ int attemptId = application.getApplicationAttemptId().getAttemptId();
+ metrics.submitApp(userName, attemptId);
+ getParent().submitApplicationAttempt(application, userName);
+ }
+
+ @Override
+ public void submitApplication(ApplicationId applicationId, String userName,
String queue) throws AccessControlException {
// Careful! Locking order is important!
@@ -653,8 +668,7 @@ public class LeafQueue implements CSQueu
// Check if the queue is accepting jobs
if (getState() != QueueState.RUNNING) {
String msg = "Queue " + getQueuePath() +
- " is STOPPED. Cannot accept submission of application: " +
- application.getApplicationId();
+ " is STOPPED. Cannot accept submission of application: " + applicationId;
LOG.info(msg);
throw new AccessControlException(msg);
}
@@ -663,8 +677,7 @@ public class LeafQueue implements CSQueu
if (getNumApplications() >= getMaxApplications()) {
String msg = "Queue " + getQueuePath() +
" already has " + getNumApplications() + " applications," +
- " cannot accept submission of application: " +
- application.getApplicationId();
+ " cannot accept submission of application: " + applicationId;
LOG.info(msg);
throw new AccessControlException(msg);
}
@@ -675,26 +688,18 @@ public class LeafQueue implements CSQueu
String msg = "Queue " + getQueuePath() +
" already has " + user.getTotalApplications() +
" applications from user " + userName +
- " cannot accept submission of application: " +
- application.getApplicationId();
+ " cannot accept submission of application: " + applicationId;
LOG.info(msg);
throw new AccessControlException(msg);
}
-
- // Add the application to our data-structures
- addApplication(application, user);
}
- int attemptId = application.getApplicationAttemptId().getAttemptId();
- metrics.submitApp(userName, attemptId);
-
// Inform the parent queue
try {
- getParent().submitApplication(application, userName, queue);
+ getParent().submitApplication(applicationId, userName, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
getParent().getQueuePath(), ace);
- removeApplication(application, user);
throw ace;
}
}
@@ -722,11 +727,11 @@ public class LeafQueue implements CSQueu
}
}
- private synchronized void addApplication(FiCaSchedulerApp application, User user) {
+ private synchronized void addApplicationAttempt(FiCaSchedulerApp application, User user) {
// Accept
user.submitApplication();
pendingApplications.add(application);
- applicationsMap.put(application.getApplicationAttemptId(), application);
+ applicationAttemptMap.put(application.getApplicationAttemptId(), application);
// Activate applications
activateApplications();
@@ -742,22 +747,28 @@ public class LeafQueue implements CSQueu
}
@Override
- public void finishApplication(FiCaSchedulerApp application, String queue) {
+ public void finishApplication(ApplicationId application, String user) {
+ // Inform the activeUsersManager
+ activeUsersManager.deactivateApplication(user, application);
+ // Inform the parent queue
+ getParent().finishApplication(application, user);
+ }
+
+ @Override
+ public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) {
// Careful! Locking order is important!
synchronized (this) {
- removeApplication(application, getUser(application.getUser()));
+ removeApplicationAttempt(application, getUser(application.getUser()));
}
-
- // Inform the parent queue
- getParent().finishApplication(application, queue);
+ getParent().finishApplicationAttempt(application, queue);
}
- public synchronized void removeApplication(FiCaSchedulerApp application, User user) {
+ public synchronized void removeApplicationAttempt(FiCaSchedulerApp application, User user) {
boolean wasActive = activeApplications.remove(application);
if (!wasActive) {
pendingApplications.remove(application);
}
- applicationsMap.remove(application.getApplicationAttemptId());
+ applicationAttemptMap.remove(application.getApplicationAttemptId());
user.finishApplication(wasActive);
if (user.getTotalApplications() == 0) {
@@ -766,13 +777,7 @@ public class LeafQueue implements CSQueu
// Check if we can activate more applications
activateApplications();
-
- // Inform the activeUsersManager
- synchronized (application) {
- activeUsersManager.deactivateApplication(
- application.getUser(), application.getApplicationId());
- }
-
+
LOG.info("Application removed -" +
" appId: " + application.getApplicationId() +
" user: " + application.getUser() +
@@ -783,10 +788,10 @@ public class LeafQueue implements CSQueu
" #queue-active-applications: " + getNumActiveApplications()
);
}
-
+
private synchronized FiCaSchedulerApp getApplication(
ApplicationAttemptId applicationAttemptId) {
- return applicationsMap.get(applicationAttemptId);
+ return applicationAttemptMap.get(applicationAttemptId);
}
private static final CSAssignment NULL_ASSIGNMENT =
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Fri Jan 3 07:26:52 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -51,7 +52,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -442,7 +442,7 @@ public class ParentQueue implements CSQu
}
@Override
- public void submitApplication(FiCaSchedulerApp application, String user,
+ public void submitApplication(ApplicationId applicationId, String user,
String queue) throws AccessControlException {
synchronized (this) {
@@ -455,57 +455,70 @@ public class ParentQueue implements CSQu
if (state != QueueState.RUNNING) {
throw new AccessControlException("Queue " + getQueuePath() +
" is STOPPED. Cannot accept submission of application: " +
- application.getApplicationId());
+ applicationId);
}
- addApplication(application, user);
+ addApplication(applicationId, user);
}
// Inform the parent queue
if (parent != null) {
try {
- parent.submitApplication(application, user, queue);
+ parent.submitApplication(applicationId, user, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
parent.getQueuePath(), ace);
- removeApplication(application, user);
+ removeApplication(applicationId, user);
throw ace;
}
}
}
- private synchronized void addApplication(FiCaSchedulerApp application,
+
+ @Override
+ public void submitApplicationAttempt(FiCaSchedulerApp application,
+ String userName) {
+ // submit attempt logic.
+ }
+
+ @Override
+ public void finishApplicationAttempt(FiCaSchedulerApp application,
+ String queue) {
+ // finish attempt logic.
+ }
+
+ private synchronized void addApplication(ApplicationId applicationId,
String user) {
-
+
++numApplications;
LOG.info("Application added -" +
- " appId: " + application.getApplicationId() +
+ " appId: " + applicationId +
" user: " + user +
" leaf-queue of parent: " + getQueueName() +
" #applications: " + getNumApplications());
}
@Override
- public void finishApplication(FiCaSchedulerApp application, String queue) {
+ public void finishApplication(ApplicationId application, String user) {
synchronized (this) {
- removeApplication(application, application.getUser());
+ removeApplication(application, user);
}
// Inform the parent queue
if (parent != null) {
- parent.finishApplication(application, queue);
+ parent.finishApplication(application, user);
}
}
- public synchronized void removeApplication(FiCaSchedulerApp application,
+ public synchronized void removeApplication(ApplicationId applicationId,
String user) {
--numApplications;
LOG.info("Application removed -" +
- " appId: " + application.getApplicationId() +
+ " appId: " + applicationId +
" user: " + user +
" leaf-queue of parent: " + getQueueName() +
" #applications: " + getNumApplications());
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Fri Jan 3 07:26:52 2014
@@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -57,7 +57,7 @@ import org.apache.hadoop.yarn.util.resou
*/
@Private
@Unstable
-public class FiCaSchedulerApp extends SchedulerApplication {
+public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class);
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Fri Jan 3 07:26:52 2014
@@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -206,7 +206,7 @@ public class FiCaSchedulerNode extends S
}
public synchronized void reserveResource(
- SchedulerApplication application, Priority priority,
+ SchedulerApplicationAttempt application, Priority priority,
RMContainer reservedContainer) {
// Check if it's already reserved
if (this.reservedContainer != null) {
@@ -241,7 +241,7 @@ public class FiCaSchedulerNode extends S
}
public synchronized void unreserveResource(
- SchedulerApplication application) {
+ SchedulerApplicationAttempt application) {
// adding NP checks as this can now be called for preemption
if (reservedContainer != null
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java Fri Jan 3 07:26:52 2014
@@ -24,11 +24,15 @@ public enum SchedulerEventType {
NODE_ADDED,
NODE_REMOVED,
NODE_UPDATE,
-
- // Source: App
+
+ // Source: RMApp
APP_ADDED,
APP_REMOVED,
+ // Source: RMAppAttempt
+ APP_ATTEMPT_ADDED,
+ APP_ATTEMPT_REMOVED,
+
// Source: ContainerAllocationExpirer
CONTAINER_EXPIRED
}
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Fri Jan 3 07:26:52 2014
@@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@Private
@Unstable
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Fri Jan 3 07:26:52 2014
@@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.util.resou
*/
@Private
@Unstable
-public class FSSchedulerApp extends SchedulerApplication {
+public class FSSchedulerApp extends SchedulerApplicationAttempt {
private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class);
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Jan 3 07:26:52 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -58,10 +59,13 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -76,6 +80,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -151,10 +157,15 @@ public class FairScheduler implements Re
// Time we last ran preemptTasksIfNecessary
private long lastPreemptCheckTime;
- // This stores per-application scheduling information, indexed by
+ // This stores per-application scheduling information,
+ @VisibleForTesting
+ protected Map<ApplicationId, SchedulerApplication> applications =
+ new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
+
+ // This stores per-application-attempt scheduling information, indexed by
// attempt ID's for fast lookup.
@VisibleForTesting
- protected Map<ApplicationAttemptId, FSSchedulerApp> applications =
+ protected Map<ApplicationAttemptId, FSSchedulerApp> appAttempts =
new ConcurrentHashMap<ApplicationAttemptId, FSSchedulerApp>();
// Nodes in the cluster, indexed by NodeId
@@ -253,7 +264,7 @@ public class FairScheduler implements Re
private RMContainer getRMContainer(ContainerId containerId) {
FSSchedulerApp application =
- applications.get(containerId.getApplicationAttemptId());
+ appAttempts.get(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
}
@@ -591,44 +602,63 @@ public class FairScheduler implements Re
* user. This will accept a new app even if the user or queue is above
* configured limits, but the app will not be marked as runnable.
*/
- protected synchronized void addApplication(
- ApplicationAttemptId applicationAttemptId, String queueName, String user) {
+ protected synchronized void addApplication(ApplicationId applicationId,
+ String queueName, String user) {
if (queueName == null || queueName.isEmpty()) {
- String message = "Reject application " + applicationAttemptId +
+ String message = "Reject application " + applicationId +
" submitted by user " + user + " with an empty queue name.";
LOG.info(message);
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptRejectedEvent(applicationAttemptId, message));
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppRejectedEvent(applicationId, message));
return;
}
- RMApp rmApp = rmContext.getRMApps().get(
- applicationAttemptId.getApplicationId());
+ RMApp rmApp = rmContext.getRMApps().get(applicationId);
FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
if (queue == null) {
rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptRejectedEvent(applicationAttemptId,
+ new RMAppRejectedEvent(applicationId,
"Application rejected by queue placement policy"));
return;
}
- FSSchedulerApp schedulerApp =
- new FSSchedulerApp(applicationAttemptId, user,
- queue, new ActiveUsersManager(getRootQueueMetrics()),
- rmContext);
-
// Enforce ACLs
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
&& !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
String msg = "User " + userUgi.getUserName() +
- " cannot submit applications to queue " + queue.getName();
+ " cannot submit applications to queue " + queue.getName();
LOG.info(msg);
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptRejectedEvent(applicationAttemptId, msg));
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppRejectedEvent(applicationId, msg));
return;
}
+
+ SchedulerApplication application =
+ new SchedulerApplication(queue, user);
+ applications.put(applicationId, application);
+
+ LOG.info("Accepted application " + applicationId + " from user: " + user
+ + ", in queue: " + queueName);
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+ }
+
+ /**
+ * Add a new application attempt to the scheduler.
+ */
+ protected synchronized void addApplicationAttempt(
+ ApplicationAttemptId applicationAttemptId) {
+ SchedulerApplication application =
+ applications.get(applicationAttemptId.getApplicationId());
+ String user = application.getUser();
+ FSLeafQueue queue = (FSLeafQueue) application.getQueue();
+
+ FSSchedulerApp schedulerApp =
+ new FSSchedulerApp(applicationAttemptId, user,
+ queue, new ActiveUsersManager(getRootQueueMetrics()),
+ rmContext);
boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
queue.addApp(schedulerApp, runnable);
@@ -639,16 +669,14 @@ public class FairScheduler implements Re
}
queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
+ appAttempts.put(applicationAttemptId, schedulerApp);
- applications.put(applicationAttemptId, schedulerApp);
-
- LOG.info("Application Submission: " + applicationAttemptId +
- ", user: "+ user +
- ", currently active: " + applications.size());
-
+ LOG.info("Added Application Attempt " + applicationAttemptId
+ + " to scheduler from user: " + user + ", currently active: "
+ + appAttempts.size());
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(applicationAttemptId,
- RMAppAttemptEventType.APP_ACCEPTED));
+ RMAppAttemptEventType.ATTEMPT_ADDED));
}
@VisibleForTesting
@@ -674,13 +702,18 @@ public class FairScheduler implements Re
return queue;
}
- private synchronized void removeApplication(
+ private synchronized void removeApplication(ApplicationId applicationId,
+ RMAppState finalState) {
+ applications.remove(applicationId);
+ }
+
+ private synchronized void removeApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState) {
LOG.info("Application " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
- FSSchedulerApp application = applications.get(applicationAttemptId);
+ FSSchedulerApp application = appAttempts.get(applicationAttemptId);
if (application == null) {
LOG.info("Unknown application " + applicationAttemptId + " has completed!");
@@ -720,7 +753,7 @@ public class FairScheduler implements Re
}
// Remove from our data-structure
- applications.remove(applicationAttemptId);
+ appAttempts.remove(applicationAttemptId);
}
/**
@@ -737,7 +770,7 @@ public class FairScheduler implements Re
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
- FSSchedulerApp application = applications.get(applicationAttemptId);
+ FSSchedulerApp application = appAttempts.get(applicationAttemptId);
if (application == null) {
LOG.info("Container " + container + " of" +
" unknown application " + applicationAttemptId +
@@ -811,7 +844,7 @@ public class FairScheduler implements Re
List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
// Make sure this application exists
- FSSchedulerApp application = applications.get(appAttemptId);
+ FSSchedulerApp application = appAttempts.get(appAttemptId);
if (application == null) {
LOG.info("Calling allocate on removed " +
"or non existant application " + appAttemptId);
@@ -882,7 +915,7 @@ public class FairScheduler implements Re
private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
- FSSchedulerApp application = applications.get(applicationAttemptId);
+ FSSchedulerApp application = appAttempts.get(applicationAttemptId);
if (application == null) {
LOG.info("Unknown application: " + applicationAttemptId +
" launched container " + containerId +
@@ -1025,23 +1058,23 @@ public class FairScheduler implements Re
}
public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
- return applications.get(appAttemptId);
+ return appAttempts.get(appAttemptId);
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId appAttemptId) {
- if (!applications.containsKey(appAttemptId)) {
+ if (!appAttempts.containsKey(appAttemptId)) {
LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
return null;
}
- return new SchedulerAppReport(applications.get(appAttemptId));
+ return new SchedulerAppReport(appAttempts.get(appAttemptId));
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId appAttemptId) {
- FSSchedulerApp app = applications.get(appAttemptId);
+ FSSchedulerApp app = appAttempts.get(appAttemptId);
if (app == null) {
LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
return null;
@@ -1094,18 +1127,34 @@ public class FairScheduler implements Re
if (!(event instanceof AppAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
- AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
- String queue = appAddedEvent.getQueue();
- addApplication(appAddedEvent.getApplicationAttemptId(), queue,
- appAddedEvent.getUser());
+ AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
+ addApplication(appAddedEvent.getApplicationId(),
+ appAddedEvent.getQueue(), appAddedEvent.getUser());
break;
case APP_REMOVED:
if (!(event instanceof AppRemovedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
- removeApplication(appRemovedEvent.getApplicationAttemptID(),
- appRemovedEvent.getFinalAttemptState());
+ removeApplication(appRemovedEvent.getApplicationID(),
+ appRemovedEvent.getFinalState());
+ break;
+ case APP_ATTEMPT_ADDED:
+ if (!(event instanceof AppAttemptAddedSchedulerEvent)) {
+ throw new RuntimeException("Unexpected event type: " + event);
+ }
+ AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
+ (AppAttemptAddedSchedulerEvent) event;
+ addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
+ break;
+ case APP_ATTEMPT_REMOVED:
+ if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {
+ throw new RuntimeException("Unexpected event type: " + event);
+ }
+ AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
+ (AppAttemptRemovedSchedulerEvent) event;
+ removeApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
+ appAttemptRemovedEvent.getFinalAttemptState());
break;
case CONTAINER_EXPIRED:
if (!(event instanceof ContainerExpiredSchedulerEvent)) {
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Fri Jan 3 07:26:52 2014
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -38,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -59,6 +59,9 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -67,10 +70,22 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -108,11 +123,15 @@ public class FifoScheduler implements Re
private Resource maximumAllocation;
private boolean usePortForNodeName;
+ @VisibleForTesting
+ protected Map<ApplicationId, SchedulerApplication> applications =
+ new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
+
// Use ConcurrentSkipListMap because applications need to be ordered
@VisibleForTesting
- protected Map<ApplicationAttemptId, FiCaSchedulerApp> applications
+ protected Map<ApplicationAttemptId, FiCaSchedulerApp> appAttempts
= new ConcurrentSkipListMap<ApplicationAttemptId, FiCaSchedulerApp>();
-
+
private ActiveUsersManager activeUsersManager;
private static final String DEFAULT_QUEUE_NAME = "default";
@@ -319,7 +338,7 @@ public class FifoScheduler implements Re
@VisibleForTesting
FiCaSchedulerApp getApplication(
ApplicationAttemptId applicationAttemptId) {
- return applications.get(applicationAttemptId);
+ return appAttempts.get(applicationAttemptId);
}
@Override
@@ -339,23 +358,47 @@ public class FifoScheduler implements Re
private FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
-
- private synchronized void addApplication(ApplicationAttemptId appAttemptId,
- String user) {
+
+ private synchronized void addApplication(ApplicationId applicationId,
+ String queue, String user) {
+ SchedulerApplication application =
+ new SchedulerApplication(null, user);
+ applications.put(applicationId, application);
+ LOG.info("Accepted application " + applicationId + " from user: " + user);
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+ }
+
+ private synchronized void addApplicationAttempt(
+ ApplicationAttemptId appAttemptId) {
+ SchedulerApplication application =
+ applications.get(appAttemptId.getApplicationId());
+ String user = application.getUser();
// TODO: Fix store
- FiCaSchedulerApp schedulerApp =
- new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
- this.rmContext);
- applications.put(appAttemptId, schedulerApp);
+ FiCaSchedulerApp schedulerApp =
+ new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
+ activeUsersManager, this.rmContext);
+ appAttempts.put(appAttemptId, schedulerApp);
metrics.submitApp(user, appAttemptId.getAttemptId());
- LOG.info("Application Submission: " + appAttemptId.getApplicationId() +
- " from " + user + ", currently active: " + applications.size());
+ LOG.info("Added Application Attempt " + appAttemptId
+ + " to scheduler from user " + application.getUser()
+ + ", currently active: " + appAttempts.size());
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(appAttemptId,
- RMAppAttemptEventType.APP_ACCEPTED));
+ RMAppAttemptEventType.ATTEMPT_ADDED));
+ }
+
+ private synchronized void doneApplication(ApplicationId applicationId,
+ RMAppState finalState) {
+ SchedulerApplication application = applications.get(applicationId);
+
+ // Inform the activeUsersManager
+ activeUsersManager.deactivateApplication(application.getUser(),
+ applicationId);
+ applications.remove(applicationId);
}
- private synchronized void doneApplication(
+ private synchronized void doneApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState)
throws IOException {
@@ -374,17 +417,11 @@ public class FifoScheduler implements Re
RMContainerEventType.KILL);
}
- // Inform the activeUsersManager
- synchronized (application) {
- activeUsersManager.deactivateApplication(
- application.getUser(), application.getApplicationId());
- }
-
// Clean up pending requests, metrics etc.
application.stop(rmAppAttemptFinalState);
// Remove the application
- applications.remove(applicationAttemptId);
+ appAttempts.remove(applicationAttemptId);
}
/**
@@ -395,10 +432,10 @@ public class FifoScheduler implements Re
private void assignContainers(FiCaSchedulerNode node) {
LOG.debug("assignContainers:" +
" node=" + node.getRMNode().getNodeAddress() +
- " #applications=" + applications.size());
+ " #applications=" + appAttempts.size());
// Try to assign containers to applications in fifo order
- for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : applications
+ for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : appAttempts
.entrySet()) {
FiCaSchedulerApp application = e.getValue();
LOG.debug("pre-assignContainers");
@@ -437,7 +474,7 @@ public class FifoScheduler implements Re
// Update the applications' headroom to correctly take into
// account the containers assigned in this update.
- for (FiCaSchedulerApp application : applications.values()) {
+ for (FiCaSchedulerApp application : appAttempts.values()) {
application.setHeadroom(Resources.subtract(clusterResource, usedResource));
}
}
@@ -692,19 +729,35 @@ public class FifoScheduler implements Re
case APP_ADDED:
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
- addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
- .getUser());
+ addApplication(appAddedEvent.getApplicationId(),
+ appAddedEvent.getQueue(), appAddedEvent.getUser());
}
break;
case APP_REMOVED:
{
AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
+ doneApplication(appRemovedEvent.getApplicationID(),
+ appRemovedEvent.getFinalState());
+ }
+ break;
+ case APP_ATTEMPT_ADDED:
+ {
+ AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
+ (AppAttemptAddedSchedulerEvent) event;
+ addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId());
+ }
+ break;
+ case APP_ATTEMPT_REMOVED:
+ {
+ AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
+ (AppAttemptRemovedSchedulerEvent) event;
try {
- doneApplication(appRemovedEvent.getApplicationAttemptID(),
- appRemovedEvent.getFinalAttemptState());
+ doneApplicationAttempt(
+ appAttemptRemovedEvent.getApplicationAttemptID(),
+ appAttemptRemovedEvent.getFinalAttemptState());
} catch(IOException ie) {
LOG.error("Unable to remove application "
- + appRemovedEvent.getApplicationAttemptID(), ie);
+ + appAttemptRemovedEvent.getApplicationAttemptID(), ie);
}
}
break;
@@ -856,8 +909,8 @@ public class FifoScheduler implements Re
public synchronized List<ApplicationAttemptId> getAppsInQueue(String queueName) {
if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>(
- applications.size());
- for (FiCaSchedulerApp app : applications.values()) {
+ appAttempts.size());
+ for (FiCaSchedulerApp app : appAttempts.values()) {
apps.add(app.getApplicationAttemptId());
}
return apps;
Modified: hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java Fri Jan 3 07:26:52 2014
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -164,11 +165,14 @@ public class Application {
final ResourceScheduler scheduler = resourceManager.getResourceScheduler();
resourceManager.getClientRMService().submitApplication(request);
-
+
// Notify scheduler
- AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent(
- this.applicationAttemptId, this.queue, this.user);
- scheduler.handle(appAddedEvent1);
+ AppAddedSchedulerEvent addAppEvent =
+ new AppAddedSchedulerEvent(this.applicationId, this.queue, "user");
+ scheduler.handle(addAppEvent);
+ AppAttemptAddedSchedulerEvent addAttemptEvent =
+ new AppAttemptAddedSchedulerEvent(this.applicationAttemptId);
+ scheduler.handle(addAttemptEvent);
}
public synchronized void addResourceRequestSpec(