You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2012/07/17 03:43:04 UTC
svn commit: r1362332 [1/3] - in
/hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/
hadoop-yarn/hadoop-yarn-server/hadoop...
Author: acmurthy
Date: Tue Jul 17 01:43:03 2012
New Revision: 1362332
URL: http://svn.apache.org/viewvc?rev=1362332&view=rev
Log:
MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal interface to allow schedulers to maintain their own.
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
Removed:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueSchedulable.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1362332&r1=1362331&r2=1362332&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Jul 17 01:43:03 2012
@@ -132,12 +132,25 @@ Branch-2 ( Unreleased changes )
NEW FEATURES
+ IMPROVEMENTS
+
+ BUG FIXES
+
+Release 2.1.0-alpha - Unreleased
+
+ INCOMPATIBLE CHANGES
+
+ NEW FEATURES
+
MAPREDUCE-4355. Add RunningJob.getJobStatus() (kkambatl via tucu)
MAPREDUCE-3451. Port Fair Scheduler to MR2 (pwendell via tucu)
IMPROVEMENTS
+ MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal
+ interface to allow schedulers to maintain their own. (acmurthy)
+
MAPREDUCE-4146. Support limits on task status string length and number of
block locations in branch-2. (Ahmed Radwan via tomwhite)
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1362332&r1=1362331&r2=1362332&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Tue Jul 17 01:43:03 2012
@@ -192,7 +192,8 @@ public class RMAppImpl implements RMApp
BuilderUtils.newApplicationResourceUsageReport(-1, -1,
Resources.createResource(-1), Resources.createResource(-1),
Resources.createResource(-1));
-
+ private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1;
+
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
Configuration config, String name, String user, String queue,
ApplicationSubmissionContext submissionContext, String clientTokenStr,
@@ -383,6 +384,7 @@ public class RMAppImpl implements RMApp
this.readLock.lock();
try {
+ ApplicationAttemptId currentApplicationAttemptId = null;
String clientToken = UNAVAILABLE;
String trackingUrl = UNAVAILABLE;
String host = UNAVAILABLE;
@@ -393,19 +395,27 @@ public class RMAppImpl implements RMApp
String diags = UNAVAILABLE;
if (allowAccess) {
if (this.currentAttempt != null) {
+ currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
trackingUrl = this.currentAttempt.getTrackingUrl();
origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl();
clientToken = this.currentAttempt.getClientToken();
host = this.currentAttempt.getHost();
rpcPort = this.currentAttempt.getRpcPort();
appUsageReport = currentAttempt.getApplicationResourceUsageReport();
+ } else {
+ currentApplicationAttemptId =
+ BuilderUtils.newApplicationAttemptId(this.applicationId,
+ DUMMY_APPLICATION_ATTEMPT_NUMBER);
}
diags = this.diagnostics.toString();
} else {
appUsageReport = DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
+ currentApplicationAttemptId =
+ BuilderUtils.newApplicationAttemptId(this.applicationId,
+ DUMMY_APPLICATION_ATTEMPT_NUMBER);
}
return BuilderUtils.newApplicationReport(this.applicationId,
- this.currentAttempt.getAppAttemptId(), this.user, this.queue,
+ currentApplicationAttemptId, this.user, this.queue,
this.name, host, rpcPort, clientToken,
createApplicationState(this.stateMachine.getCurrentState()), diags,
trackingUrl, this.startTime, this.finishTime, finishState,
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java?rev=1362332&r1=1362331&r2=1362332&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java Tue Jul 17 01:43:03 2012
@@ -56,7 +56,7 @@ public class ActiveUsersManager {
* @param user application user
* @param applicationId activated application
*/
- @Lock({Queue.class, SchedulerApp.class})
+ @Lock({Queue.class, SchedulerApplication.class})
synchronized public void activateApplication(
String user, ApplicationId applicationId) {
Set<ApplicationId> userApps = usersApplications.get(user);
@@ -79,7 +79,7 @@ public class ActiveUsersManager {
* @param user application user
* @param applicationId deactivated application
*/
- @Lock({Queue.class, SchedulerApp.class})
+ @Lock({Queue.class, SchedulerApplication.class})
synchronized public void deactivateApplication(
String user, ApplicationId applicationId) {
Set<ApplicationId> userApps = usersApplications.get(user);
@@ -102,7 +102,7 @@ public class ActiveUsersManager {
* resource requests.
* @return number of active users
*/
- @Lock({Queue.class, SchedulerApp.class})
+ @Lock({Queue.class, SchedulerApplication.class})
synchronized public int getNumActiveUsers() {
return activeUsers;
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1362332&r1=1362331&r2=1362332&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Tue Jul 17 01:43:03 2012
@@ -245,7 +245,8 @@ public class AppSchedulingInfo {
* @param allocatedContainers
* resources allocated to the application
*/
- synchronized private void allocateNodeLocal(SchedulerNode node, Priority priority,
+ synchronized private void allocateNodeLocal(
+ SchedulerNode node, Priority priority,
ResourceRequest nodeLocalRequest, Container container) {
// Update consumption and track allocations
allocate(container);
@@ -273,7 +274,8 @@ public class AppSchedulingInfo {
* @param allocatedContainers
* resources allocated to the application
*/
- synchronized private void allocateRackLocal(SchedulerNode node, Priority priority,
+ synchronized private void allocateRackLocal(
+ SchedulerNode node, Priority priority,
ResourceRequest rackLocalRequest, Container container) {
// Update consumption and track allocations
@@ -295,7 +297,8 @@ public class AppSchedulingInfo {
* @param allocatedContainers
* resources allocated to the application
*/
- synchronized private void allocateOffSwitch(SchedulerNode node, Priority priority,
+ synchronized private void allocateOffSwitch(
+ SchedulerNode node, Priority priority,
ResourceRequest offSwitchRequest, Container container) {
// Update consumption and track allocations
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java?rev=1362332&r1=1362331&r2=1362332&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerAppReport.java Tue Jul 17 01:43:03 2012
@@ -36,7 +36,7 @@ public class SchedulerAppReport {
private final Collection<RMContainer> reserved;
private final boolean pending;
- public SchedulerAppReport(SchedulerApp app) {
+ public SchedulerAppReport(SchedulerApplication app) {
this.live = app.getLiveContainers();
this.reserved = app.getReservedContainers();
this.pending = app.isPending();
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1362332&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Tue Jul 17 01:43:03 2012
@@ -0,0 +1,43 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+/**
+ * Represents an Application from the viewpoint of the scheduler.
+ * Each running Application in the RM corresponds to one instance
+ * of this class.
+ */
+@Private
+@Unstable
+public abstract class SchedulerApplication {
+
+ /**
+ * Get {@link ApplicationAttemptId} of the application master.
+ * @return <code>ApplicationAttemptId</code> of the application master
+ */
+ public abstract ApplicationAttemptId getApplicationAttemptId();
+
+ /**
+ * Get the live containers of the application.
+ * @return live containers of the application
+ */
+ public abstract Collection<RMContainer> getLiveContainers();
+
+ /**
+ * Get the reserved containers of the application.
+ * @return the reserved containers of the application
+ */
+ public abstract Collection<RMContainer> getReservedContainers();
+
+ /**
+ * Is this application pending?
+ * @return true if it is else false.
+ */
+ public abstract boolean isPending();
+
+}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1362332&r1=1362331&r2=1362332&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java Tue Jul 17 01:43:03 2012
@@ -18,224 +18,45 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-public class SchedulerNode {
+/**
+ * Represents a YARN Cluster Node from the viewpoint of the scheduler.
+ */
+@Private
+@Unstable
+public abstract class SchedulerNode {
- private static final Log LOG = LogFactory.getLog(SchedulerNode.class);
-
- private static final RecordFactory recordFactory = RecordFactoryProvider
- .getRecordFactory(null);
-
- private Resource availableResource = recordFactory.newRecordInstance(Resource.class);
- private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
-
- private volatile int numContainers;
-
- private RMContainer reservedContainer;
+ /**
+ * Get hostname.
+ * @return hostname
+ */
+ public abstract String getHostName();
- /* set of containers that are allocated containers */
- private final Map<ContainerId, RMContainer> launchedContainers =
- new HashMap<ContainerId, RMContainer>();
+ /**
+ * Get rackname.
+ * @return rackname
+ */
+ public abstract String getRackName();
- private final RMNode rmNode;
-
- public static final String ANY = "*";
-
- public SchedulerNode(RMNode node) {
- this.rmNode = node;
- this.availableResource.setMemory(node.getTotalCapability().getMemory());
- }
-
- public RMNode getRMNode() {
- return this.rmNode;
- }
-
- public NodeId getNodeID() {
- return this.rmNode.getNodeID();
- }
-
- public String getHttpAddress() {
- return this.rmNode.getHttpAddress();
- }
-
- public String getHostName() {
- return this.rmNode.getHostName();
- }
-
- public String getRackName() {
- return this.rmNode.getRackName();
- }
+ /**
+ * Get used resources on the node.
+ * @return used resources on the node
+ */
+ public abstract Resource getUsedResource();
/**
- * The Scheduler has allocated containers on this node to the
- * given application.
- *
- * @param applicationId application
- * @param rmContainer allocated container
+ * Get available resources on the node.
+ * @return available resources on the node
*/
- public synchronized void allocateContainer(ApplicationId applicationId,
- RMContainer rmContainer) {
- Container container = rmContainer.getContainer();
- deductAvailableResource(container.getResource());
- ++numContainers;
-
- launchedContainers.put(container.getId(), rmContainer);
-
- LOG.info("Assigned container " + container.getId() +
- " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
- ", which currently has " + numContainers + " containers, " +
- getUsedResource() + " used and " +
- getAvailableResource() + " available");
- }
-
- public synchronized Resource getAvailableResource() {
- return this.availableResource;
- }
-
- public synchronized Resource getUsedResource() {
- return this.usedResource;
- }
-
- private synchronized boolean isValidContainer(Container c) {
- if (launchedContainers.containsKey(c.getId()))
- return true;
- return false;
- }
-
- private synchronized void updateResource(Container container) {
- addAvailableResource(container.getResource());
- --numContainers;
- }
-
+ public abstract Resource getAvailableResource();
+
/**
- * Release an allocated container on this node.
- * @param container container to be released
+ * Get number of active containers on the node.
+ * @return number of active containers on the node
*/
- public synchronized void releaseContainer(Container container) {
- if (!isValidContainer(container)) {
- LOG.error("Invalid container released " + container);
- return;
- }
-
- /* remove the containers from the nodemanger */
- launchedContainers.remove(container.getId());
- updateResource(container);
-
- LOG.info("Released container " + container.getId() +
- " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() +
- ", which currently has " + numContainers + " containers, " +
- getUsedResource() + " used and " + getAvailableResource()
- + " available" + ", release resources=" + true);
- }
-
-
- private synchronized void addAvailableResource(Resource resource) {
- if (resource == null) {
- LOG.error("Invalid resource addition of null resource for "
- + rmNode.getNodeAddress());
- return;
- }
- Resources.addTo(availableResource, resource);
- Resources.subtractFrom(usedResource, resource);
- }
-
- private synchronized void deductAvailableResource(Resource resource) {
- if (resource == null) {
- LOG.error("Invalid deduction of null resource for "
- + rmNode.getNodeAddress());
- return;
- }
- Resources.subtractFrom(availableResource, resource);
- Resources.addTo(usedResource, resource);
- }
-
- @Override
- public String toString() {
- return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +
- " available=" + getAvailableResource().getMemory() +
- " used=" + getUsedResource().getMemory();
- }
-
- public int getNumContainers() {
- return numContainers;
- }
-
- public synchronized List<RMContainer> getRunningContainers() {
- return new ArrayList<RMContainer>(launchedContainers.values());
- }
-
- public synchronized void reserveResource(
- SchedulerApp application, Priority priority,
- RMContainer reservedContainer) {
- // Check if it's already reserved
- if (this.reservedContainer != null) {
- // Sanity check
- if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) {
- throw new IllegalStateException("Trying to reserve" +
- " container " + reservedContainer +
- " on node " + reservedContainer.getReservedNode() +
- " when currently" + " reserved resource " + this.reservedContainer +
- " on node " + this.reservedContainer.getReservedNode());
- }
-
- // Cannot reserve more than one application on a given node!
- if (!this.reservedContainer.getContainer().getId().getApplicationAttemptId().equals(
- reservedContainer.getContainer().getId().getApplicationAttemptId())) {
- throw new IllegalStateException("Trying to reserve" +
- " container " + reservedContainer +
- " for application " + application.getApplicationId() +
- " when currently" +
- " reserved container " + this.reservedContainer +
- " on node " + this);
- }
-
- LOG.info("Updated reserved container " +
- reservedContainer.getContainer().getId() + " on node " +
- this + " for application " + application);
- } else {
- LOG.info("Reserved container " + reservedContainer.getContainer().getId() +
- " on node " + this + " for application " + application);
- }
- this.reservedContainer = reservedContainer;
- }
-
- public synchronized void unreserveResource(SchedulerApp application) {
- // Cannot unreserve for wrong application...
- ApplicationAttemptId reservedApplication =
- reservedContainer.getContainer().getId().getApplicationAttemptId();
- if (!reservedApplication.equals(
- application.getApplicationAttemptId())) {
- throw new IllegalStateException("Trying to unreserve " +
- " for application " + application.getApplicationId() +
- " when currently reserved " +
- " for application " + reservedApplication.getApplicationId() +
- " on node " + this);
- }
-
- reservedContainer = null;
- }
-
- public synchronized RMContainer getReservedContainer() {
- return reservedContainer;
- }
+ public abstract int getNumContainers();
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1362332&r1=1362331&r2=1362332&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Tue Jul 17 01:43:03 2012
@@ -33,8 +33,8 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
/**
* <code>CSQueue</code> represents a node in the tree of
@@ -150,7 +150,7 @@ extends org.apache.hadoop.yarn.server.re
* @param user user who submitted the application
* @param queue queue to which the application is submitted
*/
- public void submitApplication(SchedulerApp application, String user,
+ public void submitApplication(FiCaSchedulerApp application, String user,
String queue)
throws AccessControlException;
@@ -159,7 +159,7 @@ extends org.apache.hadoop.yarn.server.re
* @param application
* @param queue application queue
*/
- public void finishApplication(SchedulerApp application, String queue);
+ public void finishApplication(FiCaSchedulerApp application, String queue);
/**
* Assign containers to applications in the queue or it's children (if any).
@@ -168,7 +168,7 @@ extends org.apache.hadoop.yarn.server.re
* @return the assignment
*/
public CSAssignment assignContainers(
- Resource clusterResource, SchedulerNode node);
+ Resource clusterResource, FiCaSchedulerNode node);
/**
* A container assigned to the queue has completed.
@@ -182,7 +182,7 @@ extends org.apache.hadoop.yarn.server.re
* @param event event to be sent to the container
*/
public void completedContainer(Resource clusterResource,
- SchedulerApp application, SchedulerNode node,
+ FiCaSchedulerApp application, FiCaSchedulerNode node,
RMContainer container, ContainerStatus containerStatus,
RMContainerEventType event);
@@ -219,6 +219,6 @@ extends org.apache.hadoop.yarn.server.re
* @param application the application for which the container was allocated
* @param container the container that was recovered.
*/
- public void recoverContainer(Resource clusterResource, SchedulerApp application,
+ public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application,
Container container);
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1362332&r1=1362331&r2=1362332&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Tue Jul 17 01:43:03 2012
@@ -63,11 +63,11 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
@@ -103,10 +103,10 @@ implements ResourceScheduler, CapacitySc
}
};
- static final Comparator<SchedulerApp> applicationComparator =
- new Comparator<SchedulerApp>() {
+ static final Comparator<FiCaSchedulerApp> applicationComparator =
+ new Comparator<FiCaSchedulerApp>() {
@Override
- public int compare(SchedulerApp a1, SchedulerApp a2) {
+ public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
return a1.getApplicationId().getId() - a2.getApplicationId().getId();
}
};
@@ -131,8 +131,8 @@ implements ResourceScheduler, CapacitySc
private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
- private Map<NodeId, SchedulerNode> nodes =
- new ConcurrentHashMap<NodeId, SchedulerNode>();
+ private Map<NodeId, FiCaSchedulerNode> nodes =
+ new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
private Resource clusterResource =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
@@ -141,8 +141,8 @@ implements ResourceScheduler, CapacitySc
private Resource minimumAllocation;
private Resource maximumAllocation;
- private Map<ApplicationAttemptId, SchedulerApp> applications =
- new ConcurrentHashMap<ApplicationAttemptId, SchedulerApp>();
+ private Map<ApplicationAttemptId, FiCaSchedulerApp> applications =
+ new ConcurrentHashMap<ApplicationAttemptId, FiCaSchedulerApp>();
private boolean initialized = false;
@@ -299,7 +299,7 @@ implements ResourceScheduler, CapacitySc
CSQueue parent, String queueName, Map<String, CSQueue> queues,
Map<String, CSQueue> oldQueues,
Comparator<CSQueue> queueComparator,
- Comparator<SchedulerApp> applicationComparator,
+ Comparator<FiCaSchedulerApp> applicationComparator,
QueueHook hook) throws IOException {
CSQueue queue;
String[] childQueueNames =
@@ -370,8 +370,8 @@ implements ResourceScheduler, CapacitySc
}
// TODO: Fix store
- SchedulerApp SchedulerApp =
- new SchedulerApp(applicationAttemptId, user, queue,
+ FiCaSchedulerApp SchedulerApp =
+ new FiCaSchedulerApp(applicationAttemptId, user, queue,
queue.getActiveUsersManager(), rmContext, null);
// Submit to the queue
@@ -404,7 +404,7 @@ implements ResourceScheduler, CapacitySc
LOG.info("Application " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
- SchedulerApp application = getApplication(applicationAttemptId);
+ FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
// throw new IOException("Unknown application " + applicationId +
@@ -456,7 +456,7 @@ implements ResourceScheduler, CapacitySc
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
List<ResourceRequest> ask, List<ContainerId> release) {
- SchedulerApp application = getApplication(applicationAttemptId);
+ FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
LOG.info("Calling allocate on removed " +
"or non existant application " + applicationAttemptId);
@@ -551,7 +551,7 @@ implements ResourceScheduler, CapacitySc
LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
}
- SchedulerNode node = getNode(nm.getNodeID());
+ FiCaSchedulerNode node = getNode(nm.getNodeID());
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
@@ -578,7 +578,7 @@ implements ResourceScheduler, CapacitySc
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
- SchedulerApp reservedApplication =
+ FiCaSchedulerApp reservedApplication =
getApplication(reservedContainer.getApplicationAttemptId());
// Try to fulfill the reservation
@@ -601,10 +601,10 @@ implements ResourceScheduler, CapacitySc
}
- private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) {
+ private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
- SchedulerApp application = getApplication(applicationAttemptId);
+ FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
LOG.info("Unknown application: " + applicationAttemptId +
" launched container " + containerId +
@@ -672,7 +672,7 @@ implements ResourceScheduler, CapacitySc
}
private synchronized void addNode(RMNode nodeManager) {
- this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
+ this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager));
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
root.updateClusterResource(clusterResource);
++numNodeManagers;
@@ -681,7 +681,7 @@ implements ResourceScheduler, CapacitySc
}
private synchronized void removeNode(RMNode nodeInfo) {
- SchedulerNode node = this.nodes.get(nodeInfo.getNodeID());
+ FiCaSchedulerNode node = this.nodes.get(nodeInfo.getNodeID());
if (node == null) {
return;
}
@@ -726,7 +726,7 @@ implements ResourceScheduler, CapacitySc
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
- SchedulerApp application = getApplication(applicationAttemptId);
+ FiCaSchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
LOG.info("Container " + container + " of" +
" unknown application " + applicationAttemptId +
@@ -735,7 +735,7 @@ implements ResourceScheduler, CapacitySc
}
// Get the node on which the container was allocated
- SchedulerNode node = getNode(container.getNodeId());
+ FiCaSchedulerNode node = getNode(container.getNodeId());
// Inform the queue
LeafQueue queue = (LeafQueue)application.getQueue();
@@ -749,24 +749,24 @@ implements ResourceScheduler, CapacitySc
}
@Lock(Lock.NoLock.class)
- SchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
+ FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
return applications.get(applicationAttemptId);
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId applicationAttemptId) {
- SchedulerApp app = getApplication(applicationAttemptId);
+ FiCaSchedulerApp app = getApplication(applicationAttemptId);
return app == null ? null : new SchedulerAppReport(app);
}
@Lock(Lock.NoLock.class)
- SchedulerNode getNode(NodeId nodeId) {
+ FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
private RMContainer getRMContainer(ContainerId containerId) {
- SchedulerApp application =
+ FiCaSchedulerApp application =
getApplication(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
}
@@ -790,7 +790,7 @@ implements ResourceScheduler, CapacitySc
@Override
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
- SchedulerNode node = getNode(nodeId);
+ FiCaSchedulerNode node = getNode(nodeId);
return node == null ? null : new SchedulerNodeReport(node);
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1362332&r1=1362331&r2=1362332&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Tue Jul 17 01:43:03 2012
@@ -61,9 +61,9 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
@@ -94,11 +94,11 @@ public class LeafQueue implements CSQueu
private float usedCapacity = 0.0f;
private volatile int numContainers;
- Set<SchedulerApp> activeApplications;
- Map<ApplicationAttemptId, SchedulerApp> applicationsMap =
- new HashMap<ApplicationAttemptId, SchedulerApp>();
+ Set<FiCaSchedulerApp> activeApplications;
+ Map<ApplicationAttemptId, FiCaSchedulerApp> applicationsMap =
+ new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
- Set<SchedulerApp> pendingApplications;
+ Set<FiCaSchedulerApp> pendingApplications;
private final Resource minimumAllocation;
private final Resource maximumAllocation;
@@ -126,7 +126,7 @@ public class LeafQueue implements CSQueu
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent,
- Comparator<SchedulerApp> applicationComparator, CSQueue old) {
+ Comparator<FiCaSchedulerApp> applicationComparator, CSQueue old) {
this.scheduler = cs;
this.queueName = queueName;
this.parent = parent;
@@ -199,8 +199,8 @@ public class LeafQueue implements CSQueu
}
this.pendingApplications =
- new TreeSet<SchedulerApp>(applicationComparator);
- this.activeApplications = new TreeSet<SchedulerApp>(applicationComparator);
+ new TreeSet<FiCaSchedulerApp>(applicationComparator);
+ this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
}
private synchronized void setupQueueConfigs(
@@ -580,7 +580,7 @@ public class LeafQueue implements CSQueu
}
@Override
- public void submitApplication(SchedulerApp application, String userName,
+ public void submitApplication(FiCaSchedulerApp application, String userName,
String queue) throws AccessControlException {
// Careful! Locking order is important!
@@ -644,9 +644,9 @@ public class LeafQueue implements CSQueu
}
private synchronized void activateApplications() {
- for (Iterator<SchedulerApp> i=pendingApplications.iterator();
+ for (Iterator<FiCaSchedulerApp> i=pendingApplications.iterator();
i.hasNext(); ) {
- SchedulerApp application = i.next();
+ FiCaSchedulerApp application = i.next();
// Check queue limit
if (getNumActiveApplications() >= getMaximumActiveApplications()) {
@@ -666,7 +666,7 @@ public class LeafQueue implements CSQueu
}
}
- private synchronized void addApplication(SchedulerApp application, User user) {
+ private synchronized void addApplication(FiCaSchedulerApp application, User user) {
// Accept
user.submitApplication();
pendingApplications.add(application);
@@ -686,7 +686,7 @@ public class LeafQueue implements CSQueu
}
@Override
- public void finishApplication(SchedulerApp application, String queue) {
+ public void finishApplication(FiCaSchedulerApp application, String queue) {
// Careful! Locking order is important!
synchronized (this) {
removeApplication(application, getUser(application.getUser()));
@@ -696,7 +696,7 @@ public class LeafQueue implements CSQueu
parent.finishApplication(application, queue);
}
- public synchronized void removeApplication(SchedulerApp application, User user) {
+ public synchronized void removeApplication(FiCaSchedulerApp application, User user) {
boolean wasActive = activeApplications.remove(application);
if (!wasActive) {
pendingApplications.remove(application);
@@ -728,7 +728,7 @@ public class LeafQueue implements CSQueu
);
}
- private synchronized SchedulerApp getApplication(
+ private synchronized FiCaSchedulerApp getApplication(
ApplicationAttemptId applicationAttemptId) {
return applicationsMap.get(applicationAttemptId);
}
@@ -738,7 +738,7 @@ public class LeafQueue implements CSQueu
@Override
public synchronized CSAssignment
- assignContainers(Resource clusterResource, SchedulerNode node) {
+ assignContainers(Resource clusterResource, FiCaSchedulerNode node) {
if(LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getHostName()
@@ -748,7 +748,7 @@ public class LeafQueue implements CSQueu
// Check for reserved resources
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
- SchedulerApp application =
+ FiCaSchedulerApp application =
getApplication(reservedContainer.getApplicationAttemptId());
return new CSAssignment(
assignReservedContainer(application, node, reservedContainer,
@@ -758,7 +758,7 @@ public class LeafQueue implements CSQueu
}
// Try to assign containers to applications in order
- for (SchedulerApp application : activeApplications) {
+ for (FiCaSchedulerApp application : activeApplications) {
if(LOG.isDebugEnabled()) {
LOG.debug("pre-assignContainers for application "
@@ -836,8 +836,8 @@ public class LeafQueue implements CSQueu
}
- private synchronized Resource assignReservedContainer(SchedulerApp application,
- SchedulerNode node, RMContainer rmContainer, Resource clusterResource) {
+ private synchronized Resource assignReservedContainer(FiCaSchedulerApp application,
+ FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource) {
// Do we still need this reservation?
Priority priority = rmContainer.getReservedPriority();
if (application.getTotalRequiredResources(priority) == 0) {
@@ -880,9 +880,9 @@ public class LeafQueue implements CSQueu
return true;
}
- @Lock({LeafQueue.class, SchedulerApp.class})
+ @Lock({LeafQueue.class, FiCaSchedulerApp.class})
private Resource computeUserLimitAndSetHeadroom(
- SchedulerApp application, Resource clusterResource, Resource required) {
+ FiCaSchedulerApp application, Resource clusterResource, Resource required) {
String user = application.getUser();
@@ -919,7 +919,7 @@ public class LeafQueue implements CSQueu
}
@Lock(NoLock.class)
- private Resource computeUserLimit(SchedulerApp application,
+ private Resource computeUserLimit(FiCaSchedulerApp application,
Resource clusterResource, Resource required) {
// What is our current capacity?
// * It is equal to the max(required, queue-capacity) if
@@ -1007,7 +1007,7 @@ public class LeafQueue implements CSQueu
return (a + (b - 1)) / b;
}
- boolean needContainers(SchedulerApp application, Priority priority, Resource required) {
+ boolean needContainers(FiCaSchedulerApp application, Priority priority, Resource required) {
int requiredContainers = application.getTotalRequiredResources(priority);
int reservedContainers = application.getNumReservedContainers(priority);
int starvation = 0;
@@ -1036,7 +1036,7 @@ public class LeafQueue implements CSQueu
}
private CSAssignment assignContainersOnNode(Resource clusterResource,
- SchedulerNode node, SchedulerApp application,
+ FiCaSchedulerNode node, FiCaSchedulerApp application,
Priority priority, RMContainer reservedContainer) {
Resource assigned = Resources.none();
@@ -1065,7 +1065,7 @@ public class LeafQueue implements CSQueu
}
private Resource assignNodeLocalContainers(Resource clusterResource,
- SchedulerNode node, SchedulerApp application,
+ FiCaSchedulerNode node, FiCaSchedulerApp application,
Priority priority, RMContainer reservedContainer) {
ResourceRequest request =
application.getResourceRequest(priority, node.getHostName());
@@ -1081,7 +1081,7 @@ public class LeafQueue implements CSQueu
}
private Resource assignRackLocalContainers(Resource clusterResource,
- SchedulerNode node, SchedulerApp application, Priority priority,
+ FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer) {
ResourceRequest request =
application.getResourceRequest(priority, node.getRackName());
@@ -1095,8 +1095,8 @@ public class LeafQueue implements CSQueu
return Resources.none();
}
- private Resource assignOffSwitchContainers(Resource clusterResource, SchedulerNode node,
- SchedulerApp application, Priority priority,
+ private Resource assignOffSwitchContainers(Resource clusterResource, FiCaSchedulerNode node,
+ FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer) {
ResourceRequest request =
application.getResourceRequest(priority, RMNode.ANY);
@@ -1111,8 +1111,8 @@ public class LeafQueue implements CSQueu
return Resources.none();
}
- boolean canAssign(SchedulerApp application, Priority priority,
- SchedulerNode node, NodeType type, RMContainer reservedContainer) {
+ boolean canAssign(FiCaSchedulerApp application, Priority priority,
+ FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) {
// Clearly we need containers for this application...
if (type == NodeType.OFF_SWITCH) {
@@ -1159,14 +1159,14 @@ public class LeafQueue implements CSQueu
}
private Container getContainer(RMContainer rmContainer,
- SchedulerApp application, SchedulerNode node,
+ FiCaSchedulerApp application, FiCaSchedulerNode node,
Resource capability, Priority priority) {
return (rmContainer != null) ? rmContainer.getContainer() :
createContainer(application, node, capability, priority);
}
- public Container createContainer(SchedulerApp application, SchedulerNode node,
+ public Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node,
Resource capability, Priority priority) {
NodeId nodeId = node.getRMNode().getNodeID();
@@ -1192,8 +1192,8 @@ public class LeafQueue implements CSQueu
return container;
}
- private Resource assignContainer(Resource clusterResource, SchedulerNode node,
- SchedulerApp application, Priority priority,
+ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
+ FiCaSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, RMContainer rmContainer) {
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getHostName()
@@ -1267,8 +1267,8 @@ public class LeafQueue implements CSQueu
}
}
- private void reserve(SchedulerApp application, Priority priority,
- SchedulerNode node, RMContainer rmContainer, Container container) {
+ private void reserve(FiCaSchedulerApp application, Priority priority,
+ FiCaSchedulerNode node, RMContainer rmContainer, Container container) {
// Update reserved metrics if this is the first reservation
if (rmContainer == null) {
getMetrics().reserveResource(
@@ -1282,8 +1282,8 @@ public class LeafQueue implements CSQueu
node.reserveResource(application, priority, rmContainer);
}
- private void unreserve(SchedulerApp application, Priority priority,
- SchedulerNode node, RMContainer rmContainer) {
+ private void unreserve(FiCaSchedulerApp application, Priority priority,
+ FiCaSchedulerNode node, RMContainer rmContainer) {
// Done with the reservation?
application.unreserve(node, priority);
node.unreserveResource(application);
@@ -1296,7 +1296,7 @@ public class LeafQueue implements CSQueu
@Override
public void completedContainer(Resource clusterResource,
- SchedulerApp application, SchedulerNode node, RMContainer rmContainer,
+ FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
if (application != null) {
// Careful! Locking order is important!
@@ -1338,7 +1338,7 @@ public class LeafQueue implements CSQueu
}
synchronized void allocateResource(Resource clusterResource,
- SchedulerApp application, Resource resource) {
+ FiCaSchedulerApp application, Resource resource) {
// Update queue metrics
Resources.addTo(usedResources, resource);
CSQueueUtils.updateQueueStatistics(
@@ -1363,7 +1363,7 @@ public class LeafQueue implements CSQueu
}
synchronized void releaseResource(Resource clusterResource,
- SchedulerApp application, Resource resource) {
+ FiCaSchedulerApp application, Resource resource) {
// Update queue metrics
Resources.subtractFrom(usedResources, resource);
CSQueueUtils.updateQueueStatistics(
@@ -1401,7 +1401,7 @@ public class LeafQueue implements CSQueu
this, parent, clusterResource, minimumAllocation);
// Update application properties
- for (SchedulerApp application : activeApplications) {
+ for (FiCaSchedulerApp application : activeApplications) {
synchronized (application) {
computeUserLimitAndSetHeadroom(application, clusterResource,
Resources.none());
@@ -1464,7 +1464,7 @@ public class LeafQueue implements CSQueu
@Override
public void recoverContainer(Resource clusterResource,
- SchedulerApp application, Container container) {
+ FiCaSchedulerApp application, Container container) {
// Careful! Locking order is important!
synchronized (this) {
allocateResource(clusterResource, application, container.getResource());
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1362332&r1=1362331&r2=1362332&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Tue Jul 17 01:43:03 2012
@@ -51,8 +51,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@Private
@Evolving
@@ -421,7 +421,7 @@ public class ParentQueue implements CSQu
}
@Override
- public void submitApplication(SchedulerApp application, String user,
+ public void submitApplication(FiCaSchedulerApp application, String user,
String queue) throws AccessControlException {
synchronized (this) {
@@ -453,7 +453,7 @@ public class ParentQueue implements CSQu
}
}
- private synchronized void addApplication(SchedulerApp application,
+ private synchronized void addApplication(FiCaSchedulerApp application,
String user) {
++numApplications;
@@ -466,7 +466,7 @@ public class ParentQueue implements CSQu
}
@Override
- public void finishApplication(SchedulerApp application, String queue) {
+ public void finishApplication(FiCaSchedulerApp application, String queue) {
synchronized (this) {
removeApplication(application, application.getUser());
@@ -478,7 +478,7 @@ public class ParentQueue implements CSQu
}
}
- public synchronized void removeApplication(SchedulerApp application,
+ public synchronized void removeApplication(FiCaSchedulerApp application,
String user) {
--numApplications;
@@ -516,7 +516,7 @@ public class ParentQueue implements CSQu
@Override
public synchronized CSAssignment assignContainers(
- Resource clusterResource, SchedulerNode node) {
+ Resource clusterResource, FiCaSchedulerNode node) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
@@ -594,14 +594,14 @@ public class ParentQueue implements CSQu
}
- private boolean canAssign(SchedulerNode node) {
+ private boolean canAssign(FiCaSchedulerNode node) {
return (node.getReservedContainer() == null) &&
Resources.greaterThanOrEqual(node.getAvailableResource(),
minimumAllocation);
}
synchronized CSAssignment assignContainersToChildQueues(Resource cluster,
- SchedulerNode node) {
+ FiCaSchedulerNode node) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
@@ -654,7 +654,7 @@ public class ParentQueue implements CSQu
@Override
public void completedContainer(Resource clusterResource,
- SchedulerApp application, SchedulerNode node,
+ FiCaSchedulerApp application, FiCaSchedulerNode node,
RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) {
if (application != null) {
// Careful! Locking order is important!
@@ -715,7 +715,7 @@ public class ParentQueue implements CSQu
@Override
public void recoverContainer(Resource clusterResource,
- SchedulerApp application, Container container) {
+ FiCaSchedulerApp application, Container container) {
// Careful! Locking order is important!
synchronized (this) {
allocateResource(clusterResource, container.getResource());
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1362332&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Tue Jul 17 01:43:03 2012
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
+
+/**
+ * Represents an Application from the viewpoint of the scheduler.
+ * Each running Application in the RM corresponds to one instance
+ * of this class.
+ */
+@SuppressWarnings("unchecked")
+@Private
+@Unstable
+public class FiCaSchedulerApp extends SchedulerApplication {
+
+ private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class);
+
+ private final RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
+
+ private final AppSchedulingInfo appSchedulingInfo;
+ private final Queue queue;
+
+ private final Resource currentConsumption = recordFactory
+ .newRecordInstance(Resource.class);
+ private Resource resourceLimit = recordFactory
+ .newRecordInstance(Resource.class);
+
+ private Map<ContainerId, RMContainer> liveContainers
+ = new HashMap<ContainerId, RMContainer>();
+ private List<RMContainer> newlyAllocatedContainers =
+ new ArrayList<RMContainer>();
+
+ final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
+ new HashMap<Priority, Map<NodeId, RMContainer>>();
+
+ /**
+ * Count how many times the application has been given an opportunity
+ * to schedule a task at each priority. Each time the scheduler
+ * asks the application for a task at this priority, it is incremented,
+ * and each time the application successfully schedules a task, it
+ * is reset to 0.
+ */
+ Multiset<Priority> schedulingOpportunities = HashMultiset.create();
+
+ Multiset<Priority> reReservations = HashMultiset.create();
+
+ Resource currentReservation = recordFactory
+ .newRecordInstance(Resource.class);
+
+ private final RMContext rmContext;
+ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
+ String user, Queue queue, ActiveUsersManager activeUsersManager,
+ RMContext rmContext, ApplicationStore store) {
+ this.rmContext = rmContext;
+ this.appSchedulingInfo =
+ new AppSchedulingInfo(applicationAttemptId, user, queue,
+ activeUsersManager, store);
+ this.queue = queue;
+ }
+
+ public ApplicationId getApplicationId() {
+ return this.appSchedulingInfo.getApplicationId();
+ }
+
+ @Override
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return this.appSchedulingInfo.getApplicationAttemptId();
+ }
+
+ public String getUser() {
+ return this.appSchedulingInfo.getUser();
+ }
+
+ public synchronized void updateResourceRequests(
+ List<ResourceRequest> requests) {
+ this.appSchedulingInfo.updateResourceRequests(requests);
+ }
+
+ public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
+ return this.appSchedulingInfo.getResourceRequests(priority);
+ }
+
+ public int getNewContainerId() {
+ return this.appSchedulingInfo.getNewContainerId();
+ }
+
+ public Collection<Priority> getPriorities() {
+ return this.appSchedulingInfo.getPriorities();
+ }
+
+ public ResourceRequest getResourceRequest(Priority priority, String nodeAddress) {
+ return this.appSchedulingInfo.getResourceRequest(priority, nodeAddress);
+ }
+
+ public synchronized int getTotalRequiredResources(Priority priority) {
+ return getResourceRequest(priority, RMNode.ANY).getNumContainers();
+ }
+
+ public Resource getResource(Priority priority) {
+ return this.appSchedulingInfo.getResource(priority);
+ }
+
+ /**
+ * Is this application pending?
+ * @return true if it is else false.
+ */
+ @Override
+ public boolean isPending() {
+ return this.appSchedulingInfo.isPending();
+ }
+
+ public String getQueueName() {
+ return this.appSchedulingInfo.getQueueName();
+ }
+
+ /**
+ * Get the list of live containers
+ * @return All of the live containers
+ */
+ @Override
+ public synchronized Collection<RMContainer> getLiveContainers() {
+ return new ArrayList<RMContainer>(liveContainers.values());
+ }
+
+ public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
+ // Cleanup all scheduling information
+ this.appSchedulingInfo.stop(rmAppAttemptFinalState);
+ }
+
+ public synchronized void containerLaunchedOnNode(ContainerId containerId,
+ NodeId nodeId) {
+ // Inform the container
+ RMContainer rmContainer =
+ getRMContainer(containerId);
+ if (rmContainer == null) {
+ // Some unknown container sneaked into the system. Kill it.
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
+ return;
+ }
+
+ rmContainer.handle(new RMContainerEvent(containerId,
+ RMContainerEventType.LAUNCHED));
+ }
+
+ synchronized public void containerCompleted(RMContainer rmContainer,
+ ContainerStatus containerStatus, RMContainerEventType event) {
+
+ Container container = rmContainer.getContainer();
+ ContainerId containerId = container.getId();
+
+ // Inform the container
+ rmContainer.handle(
+ new RMContainerFinishedEvent(
+ containerId,
+ containerStatus,
+ event)
+ );
+ LOG.info("Completed container: " + rmContainer.getContainerId() +
+ " in state: " + rmContainer.getState() + " event:" + event);
+
+ // Remove from the list of containers
+ liveContainers.remove(rmContainer.getContainerId());
+
+ RMAuditLogger.logSuccess(getUser(),
+ AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
+ getApplicationId(), containerId);
+
+ // Update usage metrics
+ Resource containerResource = rmContainer.getContainer().getResource();
+ queue.getMetrics().releaseResources(getUser(), 1, containerResource);
+ Resources.subtractFrom(currentConsumption, containerResource);
+ }
+
+ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
+ Priority priority, ResourceRequest request,
+ Container container) {
+
+ // Required sanity check - AM can call 'allocate' to update resource
+ // request without locking the scheduler, hence we need to check
+ if (getTotalRequiredResources(priority) <= 0) {
+ return null;
+ }
+
+ // Create RMContainer
+ RMContainer rmContainer = new RMContainerImpl(container, this
+ .getApplicationAttemptId(), node.getNodeID(), this.rmContext
+ .getDispatcher().getEventHandler(), this.rmContext
+ .getContainerAllocationExpirer());
+
+ // Add it to allContainers list.
+ newlyAllocatedContainers.add(rmContainer);
+ liveContainers.put(container.getId(), rmContainer);
+
+ // Update consumption and track allocations
+ appSchedulingInfo.allocate(type, node, priority, request, container);
+ Resources.addTo(currentConsumption, container.getResource());
+
+ // Inform the container
+ rmContainer.handle(
+ new RMContainerEvent(container.getId(), RMContainerEventType.START));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("allocate: applicationAttemptId="
+ + container.getId().getApplicationAttemptId()
+ + " container=" + container.getId() + " host="
+ + container.getNodeId().getHost() + " type=" + type);
+ }
+ RMAuditLogger.logSuccess(getUser(),
+ AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
+ getApplicationId(), container.getId());
+
+ return rmContainer;
+ }
+
+ synchronized public List<Container> pullNewlyAllocatedContainers() {
+ List<Container> returnContainerList = new ArrayList<Container>(
+ newlyAllocatedContainers.size());
+ for (RMContainer rmContainer : newlyAllocatedContainers) {
+ rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
+ RMContainerEventType.ACQUIRED));
+ returnContainerList.add(rmContainer.getContainer());
+ }
+ newlyAllocatedContainers.clear();
+ return returnContainerList;
+ }
+
+ public Resource getCurrentConsumption() {
+ return this.currentConsumption;
+ }
+
+ synchronized public void showRequests() {
+ if (LOG.isDebugEnabled()) {
+ for (Priority priority : getPriorities()) {
+ Map<String, ResourceRequest> requests = getResourceRequests(priority);
+ if (requests != null) {
+ LOG.debug("showRequests:" + " application=" + getApplicationId() +
+ " headRoom=" + getHeadroom() +
+ " currentConsumption=" + currentConsumption.getMemory());
+ for (ResourceRequest request : requests.values()) {
+ LOG.debug("showRequests:" + " application=" + getApplicationId()
+ + " request=" + request);
+ }
+ }
+ }
+ }
+ }
+
+ public synchronized RMContainer getRMContainer(ContainerId id) {
+ return liveContainers.get(id);
+ }
+
+ synchronized public void resetSchedulingOpportunities(Priority priority) {
+ this.schedulingOpportunities.setCount(priority, 0);
+ }
+
+ synchronized public void addSchedulingOpportunity(Priority priority) {
+ this.schedulingOpportunities.setCount(priority,
+ schedulingOpportunities.count(priority) + 1);
+ }
+
+ /**
+ * Return the number of times the application has been given an opportunity
+ * to schedule a task at the given priority since the last time it
+ * successfully did so.
+ */
+ synchronized public int getSchedulingOpportunities(Priority priority) {
+ return this.schedulingOpportunities.count(priority);
+ }
+
+ synchronized void resetReReservations(Priority priority) {
+ this.reReservations.setCount(priority, 0);
+ }
+
+ synchronized void addReReservation(Priority priority) {
+ this.reReservations.add(priority);
+ }
+
+ synchronized public int getReReservations(Priority priority) {
+ return this.reReservations.count(priority);
+ }
+
+ public synchronized int getNumReservedContainers(Priority priority) {
+ Map<NodeId, RMContainer> reservedContainers =
+ this.reservedContainers.get(priority);
+ return (reservedContainers == null) ? 0 : reservedContainers.size();
+ }
+
+ /**
+ * Get total current reservations.
+ * Used only by unit tests
+ * @return total current reservations
+ */
+ @Stable
+ @Private
+ public synchronized Resource getCurrentReservation() {
+ return currentReservation;
+ }
+
+ public synchronized RMContainer reserve(FiCaSchedulerNode node, Priority priority,
+ RMContainer rmContainer, Container container) {
+ // Create RMContainer if necessary
+ if (rmContainer == null) {
+ rmContainer =
+ new RMContainerImpl(container, getApplicationAttemptId(),
+ node.getNodeID(), rmContext.getDispatcher().getEventHandler(),
+ rmContext.getContainerAllocationExpirer());
+
+ Resources.addTo(currentReservation, container.getResource());
+
+ // Reset the re-reservation count
+ resetReReservations(priority);
+ } else {
+ // Note down the re-reservation
+ addReReservation(priority);
+ }
+ rmContainer.handle(new RMContainerReservedEvent(container.getId(),
+ container.getResource(), node.getNodeID(), priority));
+
+ Map<NodeId, RMContainer> reservedContainers =
+ this.reservedContainers.get(priority);
+ if (reservedContainers == null) {
+ reservedContainers = new HashMap<NodeId, RMContainer>();
+ this.reservedContainers.put(priority, reservedContainers);
+ }
+ reservedContainers.put(node.getNodeID(), rmContainer);
+
+ LOG.info("Application " + getApplicationId()
+ + " reserved container " + rmContainer
+ + " on node " + node + ", currently has " + reservedContainers.size()
+ + " at priority " + priority
+ + "; currentReservation " + currentReservation.getMemory());
+
+ return rmContainer;
+ }
+
+ public synchronized void unreserve(FiCaSchedulerNode node, Priority priority) {
+ Map<NodeId, RMContainer> reservedContainers =
+ this.reservedContainers.get(priority);
+ RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
+ if (reservedContainers.isEmpty()) {
+ this.reservedContainers.remove(priority);
+ }
+
+ // Reset the re-reservation count
+ resetReReservations(priority);
+
+ Resource resource = reservedContainer.getContainer().getResource();
+ Resources.subtractFrom(currentReservation, resource);
+
+ LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
+ + node + ", currently has " + reservedContainers.size() + " at priority "
+ + priority + "; currentReservation " + currentReservation);
+ }
+
+ /**
+ * Has the application reserved the given <code>node</code> at the
+ * given <code>priority</code>?
+ * @param node node to be checked
+ * @param priority priority of reserved container
+ * @return true is reserved, false if not
+ */
+ public synchronized boolean isReserved(FiCaSchedulerNode node, Priority priority) {
+ Map<NodeId, RMContainer> reservedContainers =
+ this.reservedContainers.get(priority);
+ if (reservedContainers != null) {
+ return reservedContainers.containsKey(node.getNodeID());
+ }
+ return false;
+ }
+
+ public synchronized float getLocalityWaitFactor(
+ Priority priority, int clusterNodes) {
+ // Estimate: Required unique resources (i.e. hosts + racks)
+ int requiredResources =
+ Math.max(this.getResourceRequests(priority).size() - 1, 0);
+
+ // waitFactor can't be more than '1'
+ // i.e. no point skipping more than clustersize opportunities
+ return Math.min(((float)requiredResources / clusterNodes), 1.0f);
+ }
+
+ /**
+ * Get the list of reserved containers
+ * @return All of the reserved containers.
+ */
+ @Override
+ public synchronized List<RMContainer> getReservedContainers() {
+ List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
+ for (Map.Entry<Priority, Map<NodeId, RMContainer>> e :
+ this.reservedContainers.entrySet()) {
+ reservedContainers.addAll(e.getValue().values());
+ }
+ return reservedContainers;
+ }
+
+ public synchronized void setHeadroom(Resource globalLimit) {
+ this.resourceLimit = globalLimit;
+ }
+
+ /**
+ * Get available headroom in terms of resources for the application's user.
+ * @return available resource headroom
+ */
+ public synchronized Resource getHeadroom() {
+ // Corner case to deal with applications being slightly over-limit
+ if (resourceLimit.getMemory() < 0) {
+ resourceLimit.setMemory(0);
+ }
+
+ return resourceLimit;
+ }
+
+ public Queue getQueue() {
+ return queue;
+ }
+}