You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/02/07 21:33:02 UTC
svn commit: r1565792 [2/5] - in
/hadoop/common/branches/branch-2.3/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/record...
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Fri Feb 7 20:33:01 2014
@@ -59,10 +59,10 @@ public class AppSchedulingInfo {
final Set<Priority> priorities = new TreeSet<Priority>(
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
- final Map<Priority, Map<String, ResourceRequest>> requests =
+ final Map<Priority, Map<String, ResourceRequest>> requests =
new HashMap<Priority, Map<String, ResourceRequest>>();
- private Set<String> blacklist = new HashSet<String>();
-
+ final Set<String> blacklist = new HashSet<String>();
+
//private final ApplicationStore store;
private final ActiveUsersManager activeUsersManager;
@@ -260,7 +260,7 @@ public class AppSchedulingInfo {
// once an allocation is done we assume the application is
// running from scheduler's POV.
pending = false;
- metrics.runAppAttempt(applicationId, user);
+ metrics.incrAppsRunning(this, user);
}
LOG.debug("allocate: user: " + user + ", memory: "
+ request.getCapability());
@@ -390,7 +390,7 @@ public class AppSchedulingInfo {
.getNumContainers()));
}
}
- metrics.finishAppAttempt(applicationId, pending, user);
+ metrics.finishApp(this, rmAppAttemptFinalState);
// Clear requests themselves
clearRequests();
@@ -399,15 +399,4 @@ public class AppSchedulingInfo {
public synchronized void setQueue(Queue queue) {
this.queue = queue;
}
-
- public synchronized Set<String> getBlackList() {
- return this.blacklist;
- }
-
- public synchronized void transferStateFromPreviousAppSchedulingInfo(
- AppSchedulingInfo appInfo) {
- // this.priorities = appInfo.getPriorities();
- // this.requests = appInfo.getRequests();
- this.blacklist = appInfo.getBlackList();
- }
}
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java Fri Feb 7 20:33:01 2014
@@ -41,7 +41,7 @@ import org.apache.hadoop.metrics2.lib.Mu
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
@@ -57,7 +57,7 @@ public class QueueMetrics implements Met
@Metric("# of pending apps") MutableGaugeInt appsPending;
@Metric("# of apps completed") MutableCounterInt appsCompleted;
@Metric("# of apps killed") MutableCounterInt appsKilled;
- @Metric("# of apps failed") MutableCounterInt appsFailed;
+ @Metric("# of apps failed") MutableGaugeInt appsFailed;
@Metric("Allocated memory in MB") MutableGaugeInt allocatedMB;
@Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
@@ -214,70 +214,54 @@ public class QueueMetrics implements Met
registry.snapshot(collector.addRecord(registry.info()), all);
}
- public void submitApp(String user) {
- appsSubmitted.incr();
- QueueMetrics userMetrics = getUserMetrics(user);
- if (userMetrics != null) {
- userMetrics.submitApp(user);
- }
- if (parent != null) {
- parent.submitApp(user);
+ public void submitApp(String user, int attemptId) {
+ if (attemptId == 1) {
+ appsSubmitted.incr();
+ } else {
+ appsFailed.decr();
}
- }
-
- public void submitAppAttempt(String user) {
appsPending.incr();
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
- userMetrics.submitAppAttempt(user);
+ userMetrics.submitApp(user, attemptId);
}
if (parent != null) {
- parent.submitAppAttempt(user);
+ parent.submitApp(user, attemptId);
}
}
- public void runAppAttempt(ApplicationId appId, String user) {
- runBuckets.add(appId, System.currentTimeMillis());
+ public void incrAppsRunning(AppSchedulingInfo app, String user) {
+ runBuckets.add(app.getApplicationId(), System.currentTimeMillis());
appsRunning.incr();
appsPending.decr();
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
- userMetrics.runAppAttempt(appId, user);
+ userMetrics.incrAppsRunning(app, user);
}
if (parent != null) {
- parent.runAppAttempt(appId, user);
+ parent.incrAppsRunning(app, user);
}
}
- public void finishAppAttempt(
- ApplicationId appId, boolean isPending, String user) {
- runBuckets.remove(appId);
- if (isPending) {
- appsPending.decr();
- } else {
- appsRunning.decr();
- }
- QueueMetrics userMetrics = getUserMetrics(user);
- if (userMetrics != null) {
- userMetrics.finishAppAttempt(appId, isPending, user);
- }
- if (parent != null) {
- parent.finishAppAttempt(appId, isPending, user);
- }
- }
-
- public void finishApp(String user, RMAppState rmAppFinalState) {
- switch (rmAppFinalState) {
+ public void finishApp(AppSchedulingInfo app,
+ RMAppAttemptState rmAppAttemptFinalState) {
+ runBuckets.remove(app.getApplicationId());
+ switch (rmAppAttemptFinalState) {
case KILLED: appsKilled.incr(); break;
case FAILED: appsFailed.incr(); break;
default: appsCompleted.incr(); break;
}
- QueueMetrics userMetrics = getUserMetrics(user);
+ if (app.isPending()) {
+ appsPending.decr();
+ } else {
+ appsRunning.decr();
+ }
+ QueueMetrics userMetrics = getUserMetrics(app.getUser());
if (userMetrics != null) {
- userMetrics.finishApp(user, rmAppFinalState);
+ userMetrics.finishApp(app, rmAppAttemptFinalState);
}
if (parent != null) {
- parent.finishApp(user, rmAppFinalState);
+ parent.finishApp(app, rmAppAttemptFinalState);
}
}
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java Fri Feb 7 20:33:01 2014
@@ -36,7 +36,7 @@ public class SchedulerAppReport {
private final Collection<RMContainer> reserved;
private final boolean pending;
- public SchedulerAppReport(SchedulerApplicationAttempt app) {
+ public SchedulerAppReport(SchedulerApplication app) {
this.live = app.getLiveContainers();
this.reserved = app.getReservedContainers();
this.pending = app.isPending();
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppUtils.java Fri Feb 7 20:33:01 2014
@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
public class SchedulerAppUtils {
- public static boolean isBlacklisted(SchedulerApplicationAttempt application,
+ public static boolean isBlacklisted(SchedulerApplication application,
SchedulerNode node, Log LOG) {
if (application.isBlacklisted(node.getNodeName())) {
if (LOG.isDebugEnabled()) {
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Fri Feb 7 20:33:01 2014
@@ -17,41 +17,393 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
+/**
+ * Represents an application attempt from the viewpoint of the scheduler.
+ * Each running app attempt in the RM corresponds to one instance
+ * of this class.
+ */
@Private
@Unstable
-public class SchedulerApplication {
+public abstract class SchedulerApplication {
+
+ private static final Log LOG = LogFactory.getLog(SchedulerApplication.class);
+
+ protected final AppSchedulingInfo appSchedulingInfo;
+
+ protected final Map<ContainerId, RMContainer> liveContainers =
+ new HashMap<ContainerId, RMContainer>();
+ protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
+ new HashMap<Priority, Map<NodeId, RMContainer>>();
+
+ private final Multiset<Priority> reReservations = HashMultiset.create();
+
+ protected final Resource currentReservation = Resource.newInstance(0, 0);
+ private Resource resourceLimit = Resource.newInstance(0, 0);
+ protected final Resource currentConsumption = Resource.newInstance(0, 0);
- private final Queue queue;
- private final String user;
- private SchedulerApplicationAttempt currentAttempt;
+ protected List<RMContainer> newlyAllocatedContainers =
+ new ArrayList<RMContainer>();
- public SchedulerApplication(Queue queue, String user) {
+ /**
+ * Count how many times the application has been given an opportunity
+ * to schedule a task at each priority. Each time the scheduler
+ * asks the application for a task at this priority, it is incremented,
+ * and each time the application successfully schedules a task, it
+ * is reset to 0.
+ */
+ Multiset<Priority> schedulingOpportunities = HashMultiset.create();
+
+ // Time of the last container scheduled at the current allowed level
+ protected Map<Priority, Long> lastScheduledContainer =
+ new HashMap<Priority, Long>();
+
+ protected final Queue queue;
+ protected boolean isStopped = false;
+
+ protected final RMContext rmContext;
+
+ public SchedulerApplication(ApplicationAttemptId applicationAttemptId,
+ String user, Queue queue, ActiveUsersManager activeUsersManager,
+ RMContext rmContext) {
+ this.rmContext = rmContext;
+ this.appSchedulingInfo =
+ new AppSchedulingInfo(applicationAttemptId, user, queue,
+ activeUsersManager);
this.queue = queue;
- this.user = user;
+ }
+
+ /**
+ * Get the live containers of the application.
+ * @return live containers of the application
+ */
+ public synchronized Collection<RMContainer> getLiveContainers() {
+ return new ArrayList<RMContainer>(liveContainers.values());
+ }
+
+ /**
+ * Is this application pending?
+ * @return true if it is else false.
+ */
+ public boolean isPending() {
+ return appSchedulingInfo.isPending();
+ }
+
+ /**
+ * Get {@link ApplicationAttemptId} of the application master.
+ * @return <code>ApplicationAttemptId</code> of the application master
+ */
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return appSchedulingInfo.getApplicationAttemptId();
+ }
+
+ public ApplicationId getApplicationId() {
+ return appSchedulingInfo.getApplicationId();
+ }
+
+ public String getUser() {
+ return appSchedulingInfo.getUser();
+ }
+
+ public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
+ return appSchedulingInfo.getResourceRequests(priority);
+ }
+
+ public int getNewContainerId() {
+ return appSchedulingInfo.getNewContainerId();
+ }
+
+ public Collection<Priority> getPriorities() {
+ return appSchedulingInfo.getPriorities();
+ }
+
+ public ResourceRequest getResourceRequest(Priority priority, String resourceName) {
+ return this.appSchedulingInfo.getResourceRequest(priority, resourceName);
+ }
+
+ public synchronized int getTotalRequiredResources(Priority priority) {
+ return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
+ }
+
+ public Resource getResource(Priority priority) {
+ return appSchedulingInfo.getResource(priority);
+ }
+
+ public String getQueueName() {
+ return appSchedulingInfo.getQueueName();
+ }
+
+ public synchronized RMContainer getRMContainer(ContainerId id) {
+ return liveContainers.get(id);
+ }
+
+ protected synchronized void resetReReservations(Priority priority) {
+ reReservations.setCount(priority, 0);
}
+ protected synchronized void addReReservation(Priority priority) {
+ reReservations.add(priority);
+ }
+
+ public synchronized int getReReservations(Priority priority) {
+ return reReservations.count(priority);
+ }
+
+ /**
+ * Get total current reservations.
+ * Used only by unit tests
+ * @return total current reservations
+ */
+ @Stable
+ @Private
+ public synchronized Resource getCurrentReservation() {
+ return currentReservation;
+ }
+
public Queue getQueue() {
return queue;
}
+
+ public synchronized void updateResourceRequests(
+ List<ResourceRequest> requests) {
+ if (!isStopped) {
+ appSchedulingInfo.updateResourceRequests(requests);
+ }
+ }
+
+ public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
+ // Cleanup all scheduling information
+ isStopped = true;
+ appSchedulingInfo.stop(rmAppAttemptFinalState);
+ }
- public String getUser() {
- return user;
+ public synchronized boolean isStopped() {
+ return isStopped;
}
- public SchedulerApplicationAttempt getCurrentAppAttempt() {
- return currentAttempt;
+ /**
+ * Get the list of reserved containers
+ * @return All of the reserved containers.
+ */
+ public synchronized List<RMContainer> getReservedContainers() {
+ List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
+ for (Map.Entry<Priority, Map<NodeId, RMContainer>> e :
+ this.reservedContainers.entrySet()) {
+ reservedContainers.addAll(e.getValue().values());
+ }
+ return reservedContainers;
+ }
+
+ public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
+ RMContainer rmContainer, Container container) {
+ // Create RMContainer if necessary
+ if (rmContainer == null) {
+ rmContainer =
+ new RMContainerImpl(container, getApplicationAttemptId(),
+ node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
+ rmContext.getContainerAllocationExpirer());
+
+ Resources.addTo(currentReservation, container.getResource());
+
+ // Reset the re-reservation count
+ resetReReservations(priority);
+ } else {
+ // Note down the re-reservation
+ addReReservation(priority);
+ }
+ rmContainer.handle(new RMContainerReservedEvent(container.getId(),
+ container.getResource(), node.getNodeID(), priority));
+
+ Map<NodeId, RMContainer> reservedContainers =
+ this.reservedContainers.get(priority);
+ if (reservedContainers == null) {
+ reservedContainers = new HashMap<NodeId, RMContainer>();
+ this.reservedContainers.put(priority, reservedContainers);
+ }
+ reservedContainers.put(node.getNodeID(), rmContainer);
+
+ LOG.info("Application " + getApplicationId()
+ + " reserved container " + rmContainer
+ + " on node " + node + ", currently has " + reservedContainers.size()
+ + " at priority " + priority
+ + "; currentReservation " + currentReservation.getMemory());
+
+ return rmContainer;
+ }
+
+ /**
+ * Has the application reserved the given <code>node</code> at the
+ * given <code>priority</code>?
+ * @param node node to be checked
+ * @param priority priority of reserved container
+ * @return true is reserved, false if not
+ */
+ public synchronized boolean isReserved(SchedulerNode node, Priority priority) {
+ Map<NodeId, RMContainer> reservedContainers =
+ this.reservedContainers.get(priority);
+ if (reservedContainers != null) {
+ return reservedContainers.containsKey(node.getNodeID());
+ }
+ return false;
+ }
+
+ public synchronized void setHeadroom(Resource globalLimit) {
+ this.resourceLimit = globalLimit;
}
- public void setCurrentAppAttempt(SchedulerApplicationAttempt currentAttempt) {
- this.currentAttempt = currentAttempt;
+ /**
+ * Get available headroom in terms of resources for the application's user.
+ * @return available resource headroom
+ */
+ public synchronized Resource getHeadroom() {
+ // Corner case to deal with applications being slightly over-limit
+ if (resourceLimit.getMemory() < 0) {
+ resourceLimit.setMemory(0);
+ }
+
+ return resourceLimit;
}
+
+ public synchronized int getNumReservedContainers(Priority priority) {
+ Map<NodeId, RMContainer> reservedContainers =
+ this.reservedContainers.get(priority);
+ return (reservedContainers == null) ? 0 : reservedContainers.size();
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized void containerLaunchedOnNode(ContainerId containerId,
+ NodeId nodeId) {
+ // Inform the container
+ RMContainer rmContainer = getRMContainer(containerId);
+ if (rmContainer == null) {
+ // Some unknown container sneaked into the system. Kill it.
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
+ return;
+ }
- public void stop(RMAppState rmAppFinalState) {
- queue.getMetrics().finishApp(user, rmAppFinalState);
+ rmContainer.handle(new RMContainerEvent(containerId,
+ RMContainerEventType.LAUNCHED));
+ }
+
+ public synchronized void showRequests() {
+ if (LOG.isDebugEnabled()) {
+ for (Priority priority : getPriorities()) {
+ Map<String, ResourceRequest> requests = getResourceRequests(priority);
+ if (requests != null) {
+ LOG.debug("showRequests:" + " application=" + getApplicationId() +
+ " headRoom=" + getHeadroom() +
+ " currentConsumption=" + currentConsumption.getMemory());
+ for (ResourceRequest request : requests.values()) {
+ LOG.debug("showRequests:" + " application=" + getApplicationId()
+ + " request=" + request);
+ }
+ }
+ }
+ }
+ }
+
+ public Resource getCurrentConsumption() {
+ return currentConsumption;
+ }
+
+ public synchronized List<Container> pullNewlyAllocatedContainers() {
+ List<Container> returnContainerList = new ArrayList<Container>(
+ newlyAllocatedContainers.size());
+ for (RMContainer rmContainer : newlyAllocatedContainers) {
+ rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
+ RMContainerEventType.ACQUIRED));
+ returnContainerList.add(rmContainer.getContainer());
+ }
+ newlyAllocatedContainers.clear();
+ return returnContainerList;
+ }
+
+ public synchronized void updateBlacklist(
+ List<String> blacklistAdditions, List<String> blacklistRemovals) {
+ if (!isStopped) {
+ this.appSchedulingInfo.updateBlacklist(
+ blacklistAdditions, blacklistRemovals);
+ }
+ }
+
+ public boolean isBlacklisted(String resourceName) {
+ return this.appSchedulingInfo.isBlacklisted(resourceName);
+ }
+
+ public synchronized void addSchedulingOpportunity(Priority priority) {
+ schedulingOpportunities.setCount(priority,
+ schedulingOpportunities.count(priority) + 1);
+ }
+
+ public synchronized void subtractSchedulingOpportunity(Priority priority) {
+ int count = schedulingOpportunities.count(priority) - 1;
+ this.schedulingOpportunities.setCount(priority, Math.max(count, 0));
+ }
+
+ /**
+ * Return the number of times the application has been given an opportunity
+ * to schedule a task at the given priority since the last time it
+ * successfully did so.
+ */
+ public synchronized int getSchedulingOpportunities(Priority priority) {
+ return schedulingOpportunities.count(priority);
+ }
+
+ /**
+ * Should be called when an application has successfully scheduled a container,
+ * or when the scheduling locality threshold is relaxed.
+ * Reset various internal counters which affect delay scheduling
+ *
+ * @param priority The priority of the container scheduled.
+ */
+ public synchronized void resetSchedulingOpportunities(Priority priority) {
+ resetSchedulingOpportunities(priority, System.currentTimeMillis());
+ }
+ // used for continuous scheduling
+ public synchronized void resetSchedulingOpportunities(Priority priority,
+ long currentTimeMs) {
+ lastScheduledContainer.put(priority, currentTimeMs);
+ schedulingOpportunities.setCount(priority, 0);
+ }
+
+ public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
+ return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
+ reservedContainers.size(), Resources.clone(currentConsumption),
+ Resources.clone(currentReservation),
+ Resources.add(currentConsumption, currentReservation));
}
}
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Fri Feb 7 20:33:01 2014
@@ -19,13 +19,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
/**
@@ -171,13 +170,4 @@ public interface YarnScheduler extends E
@LimitedPrivate("yarn")
@Stable
public List<ApplicationAttemptId> getAppsInQueue(String queueName);
-
- /**
- * Get the container for the given containerId.
- * @param containerId
- * @return the container for the given containerId.
- */
- @LimitedPrivate("yarn")
- @Unstable
- public RMContainer getRMContainer(ContainerId containerId);
}
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Fri Feb 7 20:33:01 2014
@@ -27,7 +27,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -36,6 +35,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -155,32 +155,21 @@ extends org.apache.hadoop.yarn.server.re
/**
* Submit a new application to the queue.
- * @param applicationId the applicationId of the application being submitted
+ * @param application application being submitted
* @param user user who submitted the application
* @param queue queue to which the application is submitted
*/
- public void submitApplication(ApplicationId applicationId, String user,
- String queue) throws AccessControlException;
-
- /**
- * Submit an application attempt to the queue.
- */
- public void submitApplicationAttempt(FiCaSchedulerApp application,
- String userName);
-
+ public void submitApplication(FiCaSchedulerApp application, String user,
+ String queue)
+ throws AccessControlException;
+
/**
* An application submitted to this queue has finished.
- * @param applicationId
- * @param user user who submitted the application
+ * @param application
+ * @param queue application queue
*/
- public void finishApplication(ApplicationId applicationId, String user);
-
- /**
- * An application attempt submitted to this queue has finished.
- */
- public void finishApplicationAttempt(FiCaSchedulerApp application,
- String queue);
-
+ public void finishApplication(FiCaSchedulerApp application, String queue);
+
/**
* Assign containers to applications in the queue or it's children (if any).
* @param clusterResource the resource of the cluster.
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Fri Feb 7 20:33:01 2014
@@ -36,7 +36,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -54,20 +53,15 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -77,10 +71,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -96,7 +88,7 @@ import com.google.common.annotations.Vis
@LimitedPrivate("yarn")
@Evolving
@SuppressWarnings("unchecked")
-public class CapacityScheduler extends AbstractYarnScheduler
+public class CapacityScheduler
implements PreemptableResourceScheduler, CapacitySchedulerContext,
Configurable {
@@ -178,6 +170,7 @@ public class CapacityScheduler extends A
private CapacitySchedulerConfiguration conf;
private Configuration yarnConf;
+ private RMContext rmContext;
private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
@@ -191,6 +184,10 @@ public class CapacityScheduler extends A
private Resource minimumAllocation;
private Resource maximumAllocation;
+ @VisibleForTesting
+ protected Map<ApplicationAttemptId, FiCaSchedulerApp> applications =
+ new ConcurrentHashMap<ApplicationAttemptId, FiCaSchedulerApp>();
+
private boolean initialized = false;
private ResourceCalculator calculator;
@@ -267,10 +264,9 @@ public class CapacityScheduler extends A
this.maximumAllocation = this.conf.getMaximumAllocation();
this.calculator = this.conf.getResourceCalculator();
this.usePortForNodeName = this.conf.getUsePortForNodeName();
- this.applications =
- new ConcurrentHashMap<ApplicationId, SchedulerApplication>();
- this.rmContext = rmContext;
+ this.rmContext = rmContext;
+
initializeQueues(this.conf);
initialized = true;
@@ -419,141 +415,105 @@ public class CapacityScheduler extends A
synchronized CSQueue getQueue(String queueName) {
return queues.get(queueName);
}
+
+ private synchronized void
+ addApplicationAttempt(ApplicationAttemptId applicationAttemptId,
+ String queueName, String user) {
- private synchronized void addApplication(ApplicationId applicationId,
- String queueName, String user) {
- // santiy checks.
+ // Sanity checks
CSQueue queue = getQueue(queueName);
if (queue == null) {
- String message = "Application " + applicationId +
+ String message = "Application " + applicationAttemptId +
" submitted by user " + user + " to unknown queue: " + queueName;
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppRejectedEvent(applicationId, message));
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptRejectedEvent(applicationAttemptId, message));
return;
}
if (!(queue instanceof LeafQueue)) {
- String message = "Application " + applicationId +
+ String message = "Application " + applicationAttemptId +
" submitted by user " + user + " to non-leaf queue: " + queueName;
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppRejectedEvent(applicationId, message));
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptRejectedEvent(applicationAttemptId, message));
return;
}
+
+ // TODO: Fix store
+ FiCaSchedulerApp SchedulerApp =
+ new FiCaSchedulerApp(applicationAttemptId, user, queue,
+ queue.getActiveUsersManager(), rmContext);
+
// Submit to the queue
try {
- queue.submitApplication(applicationId, user, queueName);
+ queue.submitApplication(SchedulerApp, user, queueName);
} catch (AccessControlException ace) {
- LOG.info("Failed to submit application " + applicationId + " to queue "
- + queueName + " from user " + user, ace);
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppRejectedEvent(applicationId, ace.toString()));
+ LOG.info("Failed to submit application " + applicationAttemptId +
+ " to queue " + queueName + " from user " + user, ace);
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptRejectedEvent(applicationAttemptId,
+ ace.toString()));
return;
}
- SchedulerApplication application =
- new SchedulerApplication(queue, user);
- applications.put(applicationId, application);
- LOG.info("Accepted application " + applicationId + " from user: " + user
- + ", in queue: " + queueName);
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
- }
- private synchronized void addApplicationAttempt(
- ApplicationAttemptId applicationAttemptId,
- boolean transferStateFromPreviousAttempt) {
- SchedulerApplication application =
- applications.get(applicationAttemptId.getApplicationId());
- CSQueue queue = (CSQueue) application.getQueue();
-
- FiCaSchedulerApp attempt =
- new FiCaSchedulerApp(applicationAttemptId, application.getUser(),
- queue, queue.getActiveUsersManager(), rmContext);
- if (transferStateFromPreviousAttempt) {
- attempt.transferStateFromPreviousAttempt(application
- .getCurrentAppAttempt());
- }
- application.setCurrentAppAttempt(attempt);
-
- queue.submitApplicationAttempt(attempt, application.getUser());
- LOG.info("Added Application Attempt " + applicationAttemptId
- + " to scheduler from user " + application.getUser() + " in queue "
- + queue.getQueueName());
- rmContext.getDispatcher().getEventHandler() .handle(
- new RMAppAttemptEvent(applicationAttemptId,
- RMAppAttemptEventType.ATTEMPT_ADDED));
- }
+ applications.put(applicationAttemptId, SchedulerApp);
- private synchronized void doneApplication(ApplicationId applicationId,
- RMAppState finalState) {
- SchedulerApplication application = applications.get(applicationId);
- if (application == null){
- // The AppRemovedSchedulerEvent maybe sent on recovery for completed apps,
- // ignore it.
- LOG.warn("Couldn't find application " + applicationId);
- return;
- }
- CSQueue queue = (CSQueue) application.getQueue();
- if (!(queue instanceof LeafQueue)) {
- LOG.error("Cannot finish application " + "from non-leaf queue: "
- + queue.getQueueName());
- } else {
- queue.finishApplication(applicationId, application.getUser());
- }
- application.stop(finalState);
- applications.remove(applicationId);
+ LOG.info("Application Submission: " + applicationAttemptId +
+ ", user: " + user +
+ " queue: " + queue +
+ ", currently active: " + applications.size());
+
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptEvent(applicationAttemptId,
+ RMAppAttemptEventType.APP_ACCEPTED));
}
private synchronized void doneApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
- RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
- LOG.info("Application Attempt " + applicationAttemptId + " is done." +
+ RMAppAttemptState rmAppAttemptFinalState) {
+ LOG.info("Application " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
- FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
- SchedulerApplication application =
- applications.get(applicationAttemptId.getApplicationId());
+ FiCaSchedulerApp application = getApplication(applicationAttemptId);
- if (application == null || attempt == null) {
+ if (application == null) {
+ // throw new IOException("Unknown application " + applicationId +
+ // " has completed!");
LOG.info("Unknown application " + applicationAttemptId + " has completed!");
return;
}
-
- // Release all the allocated, acquired, running containers
- for (RMContainer rmContainer : attempt.getLiveContainers()) {
- if (keepContainers
- && rmContainer.getState().equals(RMContainerState.RUNNING)) {
- // do not kill the running container in the case of work-preserving AM
- // restart.
- LOG.info("Skip killing " + rmContainer.getContainerId());
- continue;
- }
- completedContainer(
- rmContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
- RMContainerEventType.KILL);
+
+ // Release all the running containers
+ for (RMContainer rmContainer : application.getLiveContainers()) {
+ completedContainer(rmContainer,
+ SchedulerUtils.createAbnormalContainerStatus(
+ rmContainer.getContainerId(),
+ SchedulerUtils.COMPLETED_APPLICATION),
+ RMContainerEventType.KILL);
}
-
- // Release all reserved containers
- for (RMContainer rmContainer : attempt.getReservedContainers()) {
- completedContainer(
- rmContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- rmContainer.getContainerId(), "Application Complete"),
- RMContainerEventType.KILL);
+
+ // Release all reserved containers
+ for (RMContainer rmContainer : application.getReservedContainers()) {
+ completedContainer(rmContainer,
+ SchedulerUtils.createAbnormalContainerStatus(
+ rmContainer.getContainerId(),
+ "Application Complete"),
+ RMContainerEventType.KILL);
}
-
+
// Clean up pending requests, metrics etc.
- attempt.stop(rmAppAttemptFinalState);
-
+ application.stop(rmAppAttemptFinalState);
+
// Inform the queue
- String queueName = attempt.getQueue().getQueueName();
+ String queueName = application.getQueue().getQueueName();
CSQueue queue = queues.get(queueName);
if (!(queue instanceof LeafQueue)) {
LOG.error("Cannot finish application " + "from non-leaf queue: "
+ queueName);
} else {
- queue.finishApplicationAttempt(attempt, queue.getQueueName());
+ queue.finishApplication(application, queue.getQueueName());
}
+
+ // Remove from our data-structure
+ applications.remove(applicationAttemptId);
}
private static final Allocation EMPTY_ALLOCATION =
@@ -565,7 +525,7 @@ public class CapacityScheduler extends A
List<ResourceRequest> ask, List<ContainerId> release,
List<String> blacklistAdditions, List<String> blacklistRemovals) {
- FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
+ FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
LOG.info("Calling allocate on removed " +
"or non existant application " + applicationAttemptId);
@@ -707,8 +667,8 @@ public class CapacityScheduler extends A
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
- FiCaSchedulerApp reservedApplication =
- getCurrentAttemptForContainer(reservedContainer.getContainerId());
+ FiCaSchedulerApp reservedApplication =
+ getApplication(reservedContainer.getApplicationAttemptId());
// Try to fulfill the reservation
LOG.info("Trying to fulfill reservation for application " +
@@ -745,11 +705,12 @@ public class CapacityScheduler extends A
private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
// Get the application for the finished container
- FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
+ ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
+ FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
- LOG.info("Unknown application "
- + containerId.getApplicationAttemptId().getApplicationId()
- + " launched container " + containerId + " on node: " + node);
+ LOG.info("Unknown application: " + applicationAttemptId +
+ " launched container " + containerId +
+ " on node: " + node);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
return;
@@ -779,26 +740,12 @@ public class CapacityScheduler extends A
nodeUpdate(nodeUpdatedEvent.getRMNode());
}
break;
- case APP_ADDED:
- {
- AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
- addApplication(appAddedEvent.getApplicationId(),
- appAddedEvent.getQueue(), appAddedEvent.getUser());
- }
- break;
- case APP_REMOVED:
- {
- AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
- doneApplication(appRemovedEvent.getApplicationID(),
- appRemovedEvent.getFinalState());
- }
- break;
case APP_ATTEMPT_ADDED:
{
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
- appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+ appAttemptAddedEvent.getQueue(), appAttemptAddedEvent.getUser());
}
break;
case APP_ATTEMPT_REMOVED:
@@ -806,8 +753,7 @@ public class CapacityScheduler extends A
AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
(AppAttemptRemovedSchedulerEvent) event;
doneApplicationAttempt(appAttemptRemovedEvent.getApplicationAttemptID(),
- appAttemptRemovedEvent.getFinalAttemptState(),
- appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
+ appAttemptRemovedEvent.getFinalAttemptState());
}
break;
case CONTAINER_EXPIRED:
@@ -882,13 +828,13 @@ public class CapacityScheduler extends A
Container container = rmContainer.getContainer();
// Get the application for the finished container
- FiCaSchedulerApp application =
- getCurrentAttemptForContainer(container.getId());
- ApplicationId appId =
- container.getId().getApplicationAttemptId().getApplicationId();
+ ApplicationAttemptId applicationAttemptId =
+ container.getId().getApplicationAttemptId();
+ FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
- LOG.info("Container " + container + " of" + " unknown application "
- + appId + " completed with event " + event);
+ LOG.info("Container " + container + " of" +
+ " unknown application " + applicationAttemptId +
+ " completed with event " + event);
return;
}
@@ -900,33 +846,28 @@ public class CapacityScheduler extends A
queue.completedContainer(clusterResource, application, node,
rmContainer, containerStatus, event, null);
- LOG.info("Application attempt " + application.getApplicationAttemptId()
- + " released container " + container.getId() + " on node: " + node
- + " with event: " + event);
+ LOG.info("Application " + applicationAttemptId +
+ " released container " + container.getId() +
+ " on node: " + node +
+ " with event: " + event);
}
@Lock(Lock.NoLock.class)
- FiCaSchedulerApp getApplicationAttempt(
- ApplicationAttemptId applicationAttemptId) {
- SchedulerApplication app =
- applications.get(applicationAttemptId.getApplicationId());
- if (app != null) {
- return (FiCaSchedulerApp) app.getCurrentAppAttempt();
- }
- return null;
+ FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
+ return applications.get(applicationAttemptId);
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId applicationAttemptId) {
- FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
+ FiCaSchedulerApp app = getApplication(applicationAttemptId);
return app == null ? null : new SchedulerAppReport(app);
}
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId applicationAttemptId) {
- FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
+ FiCaSchedulerApp app = getApplication(applicationAttemptId);
return app == null ? null : app.getResourceUsageReport();
}
@@ -935,22 +876,10 @@ public class CapacityScheduler extends A
return nodes.get(nodeId);
}
- @Override
- public RMContainer getRMContainer(ContainerId containerId) {
- FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
- return (attempt == null) ? null : attempt.getRMContainer(containerId);
- }
-
- @VisibleForTesting
- public FiCaSchedulerApp getCurrentAttemptForContainer(
- ContainerId containerId) {
- SchedulerApplication app =
- applications.get(containerId.getApplicationAttemptId()
- .getApplicationId());
- if (app != null) {
- return (FiCaSchedulerApp) app.getCurrentAppAttempt();
- }
- return null;
+ private RMContainer getRMContainer(ContainerId containerId) {
+ FiCaSchedulerApp application =
+ getApplication(containerId.getApplicationAttemptId());
+ return (application == null) ? null : application.getRMContainer(containerId);
}
@Override
@@ -983,7 +912,7 @@ public class CapacityScheduler extends A
LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() +
" container: " + cont.toString());
}
- FiCaSchedulerApp app = getApplicationAttempt(aid);
+ FiCaSchedulerApp app = applications.get(aid);
if (app != null) {
app.addPreemptContainer(cont.getContainerId());
}
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Fri Feb 7 20:33:01 2014
@@ -38,7 +38,6 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -60,6 +59,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -99,7 +99,7 @@ public class LeafQueue implements CSQueu
private volatile int numContainers;
Set<FiCaSchedulerApp> activeApplications;
- Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
+ Map<ApplicationAttemptId, FiCaSchedulerApp> applicationsMap =
new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
Set<FiCaSchedulerApp> pendingApplications;
@@ -635,21 +635,7 @@ public class LeafQueue implements CSQueu
}
@Override
- public void submitApplicationAttempt(FiCaSchedulerApp application,
- String userName) {
- // Careful! Locking order is important!
- synchronized (this) {
- User user = getUser(userName);
- // Add the attempt to our data-structures
- addApplicationAttempt(application, user);
- }
-
- metrics.submitAppAttempt(userName);
- getParent().submitApplicationAttempt(application, userName);
- }
-
- @Override
- public void submitApplication(ApplicationId applicationId, String userName,
+ public void submitApplication(FiCaSchedulerApp application, String userName,
String queue) throws AccessControlException {
// Careful! Locking order is important!
@@ -667,7 +653,8 @@ public class LeafQueue implements CSQueu
// Check if the queue is accepting jobs
if (getState() != QueueState.RUNNING) {
String msg = "Queue " + getQueuePath() +
- " is STOPPED. Cannot accept submission of application: " + applicationId;
+ " is STOPPED. Cannot accept submission of application: " +
+ application.getApplicationId();
LOG.info(msg);
throw new AccessControlException(msg);
}
@@ -676,7 +663,8 @@ public class LeafQueue implements CSQueu
if (getNumApplications() >= getMaxApplications()) {
String msg = "Queue " + getQueuePath() +
" already has " + getNumApplications() + " applications," +
- " cannot accept submission of application: " + applicationId;
+ " cannot accept submission of application: " +
+ application.getApplicationId();
LOG.info(msg);
throw new AccessControlException(msg);
}
@@ -687,22 +675,28 @@ public class LeafQueue implements CSQueu
String msg = "Queue " + getQueuePath() +
" already has " + user.getTotalApplications() +
" applications from user " + userName +
- " cannot accept submission of application: " + applicationId;
+ " cannot accept submission of application: " +
+ application.getApplicationId();
LOG.info(msg);
throw new AccessControlException(msg);
}
+
+ // Add the application to our data-structures
+ addApplication(application, user);
}
+ int attemptId = application.getApplicationAttemptId().getAttemptId();
+ metrics.submitApp(userName, attemptId);
+
// Inform the parent queue
try {
- getParent().submitApplication(applicationId, userName, queue);
+ getParent().submitApplication(application, userName, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
getParent().getQueuePath(), ace);
+ removeApplication(application, user);
throw ace;
}
-
- metrics.submitApp(userName);
}
private synchronized void activateApplications() {
@@ -728,11 +722,11 @@ public class LeafQueue implements CSQueu
}
}
- private synchronized void addApplicationAttempt(FiCaSchedulerApp application, User user) {
+ private synchronized void addApplication(FiCaSchedulerApp application, User user) {
// Accept
user.submitApplication();
pendingApplications.add(application);
- applicationAttemptMap.put(application.getApplicationAttemptId(), application);
+ applicationsMap.put(application.getApplicationAttemptId(), application);
// Activate applications
activateApplications();
@@ -748,28 +742,22 @@ public class LeafQueue implements CSQueu
}
@Override
- public void finishApplication(ApplicationId application, String user) {
- // Inform the activeUsersManager
- activeUsersManager.deactivateApplication(user, application);
- // Inform the parent queue
- getParent().finishApplication(application, user);
- }
-
- @Override
- public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) {
+ public void finishApplication(FiCaSchedulerApp application, String queue) {
// Careful! Locking order is important!
synchronized (this) {
- removeApplicationAttempt(application, getUser(application.getUser()));
+ removeApplication(application, getUser(application.getUser()));
}
- getParent().finishApplicationAttempt(application, queue);
+
+ // Inform the parent queue
+ getParent().finishApplication(application, queue);
}
- public synchronized void removeApplicationAttempt(FiCaSchedulerApp application, User user) {
+ public synchronized void removeApplication(FiCaSchedulerApp application, User user) {
boolean wasActive = activeApplications.remove(application);
if (!wasActive) {
pendingApplications.remove(application);
}
- applicationAttemptMap.remove(application.getApplicationAttemptId());
+ applicationsMap.remove(application.getApplicationAttemptId());
user.finishApplication(wasActive);
if (user.getTotalApplications() == 0) {
@@ -778,7 +766,13 @@ public class LeafQueue implements CSQueu
// Check if we can activate more applications
activateApplications();
-
+
+ // Inform the activeUsersManager
+ synchronized (application) {
+ activeUsersManager.deactivateApplication(
+ application.getUser(), application.getApplicationId());
+ }
+
LOG.info("Application removed -" +
" appId: " + application.getApplicationId() +
" user: " + application.getUser() +
@@ -789,10 +783,10 @@ public class LeafQueue implements CSQueu
" #queue-active-applications: " + getNumActiveApplications()
);
}
-
+
private synchronized FiCaSchedulerApp getApplication(
ApplicationAttemptId applicationAttemptId) {
- return applicationAttemptMap.get(applicationAttemptId);
+ return applicationsMap.get(applicationAttemptId);
}
private static final CSAssignment NULL_ASSIGNMENT =
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Fri Feb 7 20:33:01 2014
@@ -37,7 +37,6 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -52,6 +51,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -442,7 +442,7 @@ public class ParentQueue implements CSQu
}
@Override
- public void submitApplication(ApplicationId applicationId, String user,
+ public void submitApplication(FiCaSchedulerApp application, String user,
String queue) throws AccessControlException {
synchronized (this) {
@@ -455,70 +455,57 @@ public class ParentQueue implements CSQu
if (state != QueueState.RUNNING) {
throw new AccessControlException("Queue " + getQueuePath() +
" is STOPPED. Cannot accept submission of application: " +
- applicationId);
+ application.getApplicationId());
}
- addApplication(applicationId, user);
+ addApplication(application, user);
}
// Inform the parent queue
if (parent != null) {
try {
- parent.submitApplication(applicationId, user, queue);
+ parent.submitApplication(application, user, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
parent.getQueuePath(), ace);
- removeApplication(applicationId, user);
+ removeApplication(application, user);
throw ace;
}
}
}
-
- @Override
- public void submitApplicationAttempt(FiCaSchedulerApp application,
- String userName) {
- // submit attempt logic.
- }
-
- @Override
- public void finishApplicationAttempt(FiCaSchedulerApp application,
- String queue) {
- // finish attempt logic.
- }
-
- private synchronized void addApplication(ApplicationId applicationId,
+ private synchronized void addApplication(FiCaSchedulerApp application,
String user) {
-
+
++numApplications;
LOG.info("Application added -" +
- " appId: " + applicationId +
+ " appId: " + application.getApplicationId() +
" user: " + user +
" leaf-queue of parent: " + getQueueName() +
" #applications: " + getNumApplications());
}
@Override
- public void finishApplication(ApplicationId application, String user) {
+ public void finishApplication(FiCaSchedulerApp application, String queue) {
synchronized (this) {
- removeApplication(application, user);
+ removeApplication(application, application.getUser());
}
// Inform the parent queue
if (parent != null) {
- parent.finishApplication(application, user);
+ parent.finishApplication(application, queue);
}
}
- public synchronized void removeApplication(ApplicationId applicationId,
+ public synchronized void removeApplication(FiCaSchedulerApp application,
String user) {
--numApplications;
LOG.info("Application removed -" +
- " appId: " + applicationId +
+ " appId: " + application.getApplicationId() +
" user: " + user +
" leaf-queue of parent: " + getQueueName() +
" #applications: " + getNumApplications());
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Fri Feb 7 20:33:01 2014
@@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -57,7 +57,7 @@ import org.apache.hadoop.yarn.util.resou
*/
@Private
@Unstable
-public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
+public class FiCaSchedulerApp extends SchedulerApplication {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class);
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java Fri Feb 7 20:33:01 2014
@@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -206,7 +206,7 @@ public class FiCaSchedulerNode extends S
}
public synchronized void reserveResource(
- SchedulerApplicationAttempt application, Priority priority,
+ SchedulerApplication application, Priority priority,
RMContainer reservedContainer) {
// Check if it's already reserved
if (this.reservedContainer != null) {
@@ -219,8 +219,7 @@ public class FiCaSchedulerNode extends S
" on node " + this.reservedContainer.getReservedNode());
}
- // Cannot reserve more than one application attempt on a given node!
- // Reservation is still against attempt.
+ // Cannot reserve more than one application on a given node!
if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals(
reservedContainer.getContainer().getId().getApplicationAttemptId())) {
throw new IllegalStateException("Trying to reserve" +
@@ -242,7 +241,7 @@ public class FiCaSchedulerNode extends S
}
public synchronized void unreserveResource(
- SchedulerApplicationAttempt application) {
+ SchedulerApplication application) {
// adding NP checks as this can now be called for preemption
if (reservedContainer != null
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Fri Feb 7 20:33:01 2014
@@ -23,21 +23,27 @@ import org.apache.hadoop.yarn.api.record
public class AppAttemptAddedSchedulerEvent extends SchedulerEvent {
private final ApplicationAttemptId applicationAttemptId;
- private final boolean transferStateFromPreviousAttempt;
+ private final String queue;
+ private final String user;
public AppAttemptAddedSchedulerEvent(
- ApplicationAttemptId applicationAttemptId,
- boolean transferStateFromPreviousAttempt) {
+ ApplicationAttemptId applicationAttemptId, String queue, String user) {
super(SchedulerEventType.APP_ATTEMPT_ADDED);
this.applicationAttemptId = applicationAttemptId;
- this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
+ this.queue = queue;
+ this.user = user;
}
public ApplicationAttemptId getApplicationAttemptId() {
return applicationAttemptId;
}
- public boolean getTransferStateFromPreviousAttempt() {
- return transferStateFromPreviousAttempt;
+ public String getQueue() {
+ return queue;
}
+
+ public String getUser() {
+ return user;
+ }
+
}
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptRemovedSchedulerEvent.java Fri Feb 7 20:33:01 2014
@@ -25,15 +25,13 @@ public class AppAttemptRemovedSchedulerE
private final ApplicationAttemptId applicationAttemptId;
private final RMAppAttemptState finalAttemptState;
- private final boolean keepContainersAcrossAppAttempts;
public AppAttemptRemovedSchedulerEvent(
ApplicationAttemptId applicationAttemptId,
- RMAppAttemptState finalAttemptState, boolean keepContainers) {
+ RMAppAttemptState finalAttemptState) {
super(SchedulerEventType.APP_ATTEMPT_REMOVED);
this.applicationAttemptId = applicationAttemptId;
this.finalAttemptState = finalAttemptState;
- this.keepContainersAcrossAppAttempts = keepContainers;
}
public ApplicationAttemptId getApplicationAttemptID() {
@@ -43,8 +41,4 @@ public class AppAttemptRemovedSchedulerE
public RMAppAttemptState getFinalAttemptState() {
return this.finalAttemptState;
}
-
- public boolean getKeepContainersAcrossAppAttempts() {
- return this.keepContainersAcrossAppAttempts;
- }
}
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java Fri Feb 7 20:33:01 2014
@@ -24,11 +24,7 @@ public enum SchedulerEventType {
NODE_ADDED,
NODE_REMOVED,
NODE_UPDATE,
-
- // Source: RMApp
- APP_ADDED,
- APP_REMOVED,
-
+
// Source: RMAppAttempt
APP_ATTEMPT_ADDED,
APP_ATTEMPT_REMOVED,
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Fri Feb 7 20:33:01 2014
@@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@Private
@Unstable
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1565792&r1=1565791&r2=1565792&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Fri Feb 7 20:33:01 2014
@@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.util.resou
*/
@Private
@Unstable
-public class FSSchedulerApp extends SchedulerApplicationAttempt {
+public class FSSchedulerApp extends SchedulerApplication {
private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class);