You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC
svn commit: r1619012 [13/26] - in
/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./
hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Tue Aug 19 23:49:39 2014
@@ -18,34 +18,101 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
-public abstract class AbstractYarnScheduler implements ResourceScheduler {
+import com.google.common.util.concurrent.SettableFuture;
+
+
+@SuppressWarnings("unchecked")
+public abstract class AbstractYarnScheduler
+ <T extends SchedulerApplicationAttempt, N extends SchedulerNode>
+ extends AbstractService implements ResourceScheduler {
+
+ private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
+
+ // Nodes in the cluster, indexed by NodeId
+ protected Map<NodeId, N> nodes = new ConcurrentHashMap<NodeId, N>();
+
+ // Whole capacity of the cluster
+ protected Resource clusterResource = Resource.newInstance(0, 0);
+
+ protected Resource minimumAllocation;
+ protected Resource maximumAllocation;
protected RMContext rmContext;
- protected Map<ApplicationId, SchedulerApplication> applications;
+ protected Map<ApplicationId, SchedulerApplication<T>> applications;
+ protected int nmExpireInterval;
+
protected final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
+ /**
+ * Construct the service.
+ *
+ * @param name service name
+ */
+ public AbstractYarnScheduler(String name) {
+ super(name);
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ nmExpireInterval =
+ conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+ createReleaseCache();
+ super.serviceInit(conf);
+ }
+
public synchronized List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {
ApplicationId appId = currentAttempt.getApplicationId();
- SchedulerApplication app = applications.get(appId);
+ SchedulerApplication<T> app = applications.get(appId);
List<Container> containerList = new ArrayList<Container>();
RMApp appImpl = this.rmContext.getRMApps().get(appId);
if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) {
@@ -64,14 +131,333 @@ public abstract class AbstractYarnSchedu
return containerList;
}
- public Map<ApplicationId, SchedulerApplication> getSchedulerApplications() {
+ public Map<ApplicationId, SchedulerApplication<T>>
+ getSchedulerApplications() {
return applications;
}
-
+
+ @Override
+ public Resource getClusterResource() {
+ return clusterResource;
+ }
+
+ @Override
+ public Resource getMinimumResourceCapability() {
+ return minimumAllocation;
+ }
+
+ @Override
+ public Resource getMaximumResourceCapability() {
+ return maximumAllocation;
+ }
+
+ protected void containerLaunchedOnNode(ContainerId containerId,
+ SchedulerNode node) {
+ // Get the application for the finished container
+ SchedulerApplicationAttempt application = getCurrentAttemptForContainer
+ (containerId);
+ if (application == null) {
+ LOG.info("Unknown application "
+ + containerId.getApplicationAttemptId().getApplicationId()
+ + " launched container " + containerId + " on node: " + node);
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+ return;
+ }
+
+ application.containerLaunchedOnNode(containerId, node.getNodeID());
+ }
+
+ public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
+ SchedulerApplication<T> app =
+ applications.get(applicationAttemptId.getApplicationId());
+ return app == null ? null : app.getCurrentAppAttempt();
+ }
+
+ @Override
+ public SchedulerAppReport getSchedulerAppInfo(
+ ApplicationAttemptId appAttemptId) {
+ SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
+ if (attempt == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
+ }
+ return null;
+ }
+ return new SchedulerAppReport(attempt);
+ }
+
+ @Override
+ public ApplicationResourceUsageReport getAppResourceUsageReport(
+ ApplicationAttemptId appAttemptId) {
+ SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
+ if (attempt == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
+ }
+ return null;
+ }
+ return attempt.getResourceUsageReport();
+ }
+
+ public T getCurrentAttemptForContainer(ContainerId containerId) {
+ return getApplicationAttempt(containerId.getApplicationAttemptId());
+ }
+
+ @Override
+ public RMContainer getRMContainer(ContainerId containerId) {
+ SchedulerApplicationAttempt attempt =
+ getCurrentAttemptForContainer(containerId);
+ return (attempt == null) ? null : attempt.getRMContainer(containerId);
+ }
+
+ @Override
+ public SchedulerNodeReport getNodeReport(NodeId nodeId) {
+ N node = nodes.get(nodeId);
+ return node == null ? null : new SchedulerNodeReport(node);
+ }
+
@Override
public String moveApplication(ApplicationId appId, String newQueue)
throws YarnException {
throw new YarnException(getClass().getSimpleName()
+ " does not support moving apps between queues");
}
+
+ private void killOrphanContainerOnNode(RMNode node,
+ NMContainerStatus container) {
+ if (!container.getContainerState().equals(ContainerState.COMPLETE)) {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeCleanContainerEvent(node.getNodeID(),
+ container.getContainerId()));
+ }
+ }
+
+ public synchronized void recoverContainersOnNode(
+ List<NMContainerStatus> containerReports, RMNode nm) {
+ if (!rmContext.isWorkPreservingRecoveryEnabled()
+ || containerReports == null
+ || (containerReports != null && containerReports.isEmpty())) {
+ return;
+ }
+
+ for (NMContainerStatus container : containerReports) {
+ ApplicationId appId =
+ container.getContainerId().getApplicationAttemptId().getApplicationId();
+ RMApp rmApp = rmContext.getRMApps().get(appId);
+ if (rmApp == null) {
+ LOG.error("Skip recovering container " + container
+ + " for unknown application.");
+ killOrphanContainerOnNode(nm, container);
+ continue;
+ }
+
+ // Unmanaged AM recovery is addressed in YARN-1815
+ if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
+ LOG.info("Skip recovering container " + container + " for unmanaged AM."
+ + rmApp.getApplicationId());
+ killOrphanContainerOnNode(nm, container);
+ continue;
+ }
+
+ SchedulerApplication<T> schedulerApp = applications.get(appId);
+ if (schedulerApp == null) {
+ LOG.info("Skip recovering container " + container
+ + " for unknown SchedulerApplication. Application current state is "
+ + rmApp.getState());
+ killOrphanContainerOnNode(nm, container);
+ continue;
+ }
+
+ LOG.info("Recovering container " + container);
+ SchedulerApplicationAttempt schedulerAttempt =
+ schedulerApp.getCurrentAppAttempt();
+
+ // create container
+ RMContainer rmContainer = recoverAndCreateContainer(container, nm);
+
+ // recover RMContainer
+ rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(),
+ container));
+
+ // recover scheduler node
+ nodes.get(nm.getNodeID()).recoverContainer(rmContainer);
+
+ // recover queue: update headroom etc.
+ Queue queue = schedulerAttempt.getQueue();
+ queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);
+
+ // recover scheduler attempt
+ schedulerAttempt.recoverContainer(rmContainer);
+
+ // set master container for the current running AMContainer for this
+ // attempt.
+ RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
+ if (appAttempt != null) {
+ Container masterContainer = appAttempt.getMasterContainer();
+
+ // Mark current running AMContainer's RMContainer based on the master
+ // container ID stored in AppAttempt.
+ if (masterContainer != null
+ && masterContainer.getId().equals(rmContainer.getContainerId())) {
+ ((RMContainerImpl)rmContainer).setAMContainer(true);
+ }
+ }
+
+ synchronized (schedulerAttempt) {
+ Set<ContainerId> releases = schedulerAttempt.getPendingRelease();
+ if (releases.contains(container.getContainerId())) {
+ // release the container
+ rmContainer.handle(new RMContainerFinishedEvent(container
+ .getContainerId(), SchedulerUtils.createAbnormalContainerStatus(
+ container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER),
+ RMContainerEventType.RELEASED));
+ releases.remove(container.getContainerId());
+ LOG.info(container.getContainerId() + " is released by application.");
+ }
+ }
+ }
+ }
+
+ private RMContainer recoverAndCreateContainer(NMContainerStatus status,
+ RMNode node) {
+ Container container =
+ Container.newInstance(status.getContainerId(), node.getNodeID(),
+ node.getHttpAddress(), status.getAllocatedResource(),
+ status.getPriority(), null);
+ ApplicationAttemptId attemptId =
+ container.getId().getApplicationAttemptId();
+ RMContainer rmContainer =
+ new RMContainerImpl(container, attemptId, node.getNodeID(),
+ applications.get(attemptId.getApplicationId()).getUser(), rmContext,
+ status.getCreationTime());
+ return rmContainer;
+ }
+
+ /**
+ * Recover resource request back from RMContainer when a container is
+ * preempted before AM pulled the same. If container is pulled by
+ * AM, then RMContainer will not have resource request to recover.
+ * @param rmContainer
+ */
+ protected void recoverResourceRequestForContainer(RMContainer rmContainer) {
+ List<ResourceRequest> requests = rmContainer.getResourceRequests();
+
+ // If container state is moved to ACQUIRED, request will be empty.
+ if (requests == null) {
+ return;
+ }
+ // Add resource request back to Scheduler.
+ SchedulerApplicationAttempt schedulerAttempt
+ = getCurrentAttemptForContainer(rmContainer.getContainerId());
+ if (schedulerAttempt != null) {
+ schedulerAttempt.recoverResourceRequests(requests);
+ }
+ }
+
+ protected void createReleaseCache() {
+ // Cleanup the cache after nm expire interval.
+ new Timer().schedule(new TimerTask() {
+ @Override
+ public void run() {
+ for (SchedulerApplication<T> app : applications.values()) {
+
+ T attempt = app.getCurrentAppAttempt();
+ synchronized (attempt) {
+ for (ContainerId containerId : attempt.getPendingRelease()) {
+ RMAuditLogger.logFailure(
+ app.getUser(),
+ AuditConstants.RELEASE_CONTAINER,
+ "Unauthorized access or invalid container",
+ "Scheduler",
+ "Trying to release container not owned by app or with invalid id.",
+ attempt.getApplicationId(), containerId);
+ }
+ attempt.getPendingRelease().clear();
+ }
+ }
+ LOG.info("Release request cache is cleaned up");
+ }
+ }, nmExpireInterval);
+ }
+
+ // clean up a completed container
+ protected abstract void completedContainer(RMContainer rmContainer,
+ ContainerStatus containerStatus, RMContainerEventType event);
+
+ protected void releaseContainers(List<ContainerId> containers,
+ SchedulerApplicationAttempt attempt) {
+ for (ContainerId containerId : containers) {
+ RMContainer rmContainer = getRMContainer(containerId);
+ if (rmContainer == null) {
+ if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
+ < nmExpireInterval) {
+ LOG.info(containerId + " doesn't exist. Add the container"
+ + " to the release request cache as it maybe on recovery.");
+ synchronized (attempt) {
+ attempt.getPendingRelease().add(containerId);
+ }
+ } else {
+ RMAuditLogger.logFailure(attempt.getUser(),
+ AuditConstants.RELEASE_CONTAINER,
+ "Unauthorized access or invalid container", "Scheduler",
+ "Trying to release container not owned by app or with invalid id.",
+ attempt.getApplicationId(), containerId);
+ }
+ }
+ completedContainer(rmContainer,
+ SchedulerUtils.createAbnormalContainerStatus(containerId,
+ SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
+ }
+ }
+
+ public SchedulerNode getSchedulerNode(NodeId nodeId) {
+ return nodes.get(nodeId);
+ }
+
+ @Override
+ public synchronized void moveAllApps(String sourceQueue, String destQueue)
+ throws YarnException {
+ // check if destination queue is a valid leaf queue
+ try {
+ getQueueInfo(destQueue, false, false);
+ } catch (IOException e) {
+ LOG.warn(e);
+ throw new YarnException(e);
+ }
+ // check if source queue is a valid
+ List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue);
+ if (apps == null) {
+ String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist";
+ LOG.warn(errMsg);
+ throw new YarnException(errMsg);
+ }
+ // generate move events for each pending/running app
+ for (ApplicationAttemptId app : apps) {
+ SettableFuture<Object> future = SettableFuture.create();
+ this.rmContext
+ .getDispatcher()
+ .getEventHandler()
+ .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future));
+ }
+ }
+
+ @Override
+ public synchronized void killAllAppsInQueue(String queueName)
+ throws YarnException {
+ // check if queue is a valid
+ List<ApplicationAttemptId> apps = getAppsInQueue(queueName);
+ if (apps == null) {
+ String errMsg = "The specified Queue: " + queueName + " doesn't exist";
+ LOG.warn(errMsg);
+ throw new YarnException(errMsg);
+ }
+ // generate kill events for each pending/running app
+ for (ApplicationAttemptId app : apps) {
+ this.rmContext
+ .getDispatcher()
+ .getEventHandler()
+ .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL));
+ }
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Tue Aug 19 23:49:39 2014
@@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@@ -52,10 +54,13 @@ public class AppSchedulingInfo {
private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
private final ApplicationAttemptId applicationAttemptId;
final ApplicationId applicationId;
- private final String queueName;
+ private String queueName;
Queue queue;
final String user;
- private final AtomicInteger containerIdCounter = new AtomicInteger(0);
+ // TODO making containerIdCounter long
+ private final AtomicInteger containerIdCounter;
+ private final int EPOCH_BIT_MASK = 0x3ff;
+ private final int EPOCH_BIT_SHIFT = 22;
final Set<Priority> priorities = new TreeSet<Priority>(
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
@@ -68,15 +73,19 @@ public class AppSchedulingInfo {
/* Allocated by scheduler */
boolean pending = true; // for app metrics
-
+
+
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
- String user, Queue queue, ActiveUsersManager activeUsersManager) {
+ String user, Queue queue, ActiveUsersManager activeUsersManager,
+ int epoch) {
this.applicationAttemptId = appAttemptId;
this.applicationId = appAttemptId.getApplicationId();
this.queue = queue;
this.queueName = queue.getQueueName();
this.user = user;
this.activeUsersManager = activeUsersManager;
+ this.containerIdCounter = new AtomicInteger(
+ (epoch & EPOCH_BIT_MASK) << EPOCH_BIT_SHIFT);
}
public ApplicationId getApplicationId() {
@@ -118,9 +127,10 @@ public class AppSchedulingInfo {
* by the application.
*
* @param requests resources to be acquired
+ * @param recoverPreemptedRequest recover Resource Request on preemption
*/
synchronized public void updateResourceRequests(
- List<ResourceRequest> requests) {
+ List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
QueueMetrics metrics = queue.getMetrics();
// Update resource requests
@@ -154,8 +164,13 @@ public class AppSchedulingInfo {
asks = new HashMap<String, ResourceRequest>();
this.requests.put(priority, asks);
this.priorities.add(priority);
- } else if (updatePendingResources) {
- lastRequest = asks.get(resourceName);
+ }
+ lastRequest = asks.get(resourceName);
+
+ if (recoverPreemptedRequest && lastRequest != null) {
+ // Increment the number of containers to 1, as it is recovering a
+ // single container.
+ request.setNumContainers(lastRequest.getNumContainers() + 1);
}
asks.put(resourceName, request);
@@ -245,14 +260,16 @@ public class AppSchedulingInfo {
* @param container
* the containers allocated.
*/
- synchronized public void allocate(NodeType type, SchedulerNode node,
- Priority priority, ResourceRequest request, Container container) {
+ synchronized public List<ResourceRequest> allocate(NodeType type,
+ SchedulerNode node, Priority priority, ResourceRequest request,
+ Container container) {
+ List<ResourceRequest> resourceRequests = new ArrayList<ResourceRequest>();
if (type == NodeType.NODE_LOCAL) {
- allocateNodeLocal(node, priority, request, container);
+ allocateNodeLocal(node, priority, request, container, resourceRequests);
} else if (type == NodeType.RACK_LOCAL) {
- allocateRackLocal(node, priority, request, container);
+ allocateRackLocal(node, priority, request, container, resourceRequests);
} else {
- allocateOffSwitch(node, priority, request, container);
+ allocateOffSwitch(node, priority, request, container, resourceRequests);
}
QueueMetrics metrics = queue.getMetrics();
if (pending) {
@@ -270,6 +287,7 @@ public class AppSchedulingInfo {
+ " resource=" + request.getCapability());
}
metrics.allocateResources(user, 1, request.getCapability(), true);
+ return resourceRequests;
}
/**
@@ -279,9 +297,9 @@ public class AppSchedulingInfo {
* @param allocatedContainers
* resources allocated to the application
*/
- synchronized private void allocateNodeLocal(
- SchedulerNode node, Priority priority,
- ResourceRequest nodeLocalRequest, Container container) {
+ synchronized private void allocateNodeLocal(SchedulerNode node,
+ Priority priority, ResourceRequest nodeLocalRequest, Container container,
+ List<ResourceRequest> resourceRequests) {
// Update future requirements
nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
if (nodeLocalRequest.getNumContainers() == 0) {
@@ -295,7 +313,14 @@ public class AppSchedulingInfo {
this.requests.get(priority).remove(node.getRackName());
}
- decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
+ ResourceRequest offRackRequest = requests.get(priority).get(
+ ResourceRequest.ANY);
+ decrementOutstanding(offRackRequest);
+
+ // Update cloned NodeLocal, RackLocal and OffRack requests for recovery
+ resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
+ resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+ resourceRequests.add(cloneResourceRequest(offRackRequest));
}
/**
@@ -305,16 +330,22 @@ public class AppSchedulingInfo {
* @param allocatedContainers
* resources allocated to the application
*/
- synchronized private void allocateRackLocal(
- SchedulerNode node, Priority priority,
- ResourceRequest rackLocalRequest, Container container) {
+ synchronized private void allocateRackLocal(SchedulerNode node,
+ Priority priority, ResourceRequest rackLocalRequest, Container container,
+ List<ResourceRequest> resourceRequests) {
// Update future requirements
rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
if (rackLocalRequest.getNumContainers() == 0) {
this.requests.get(priority).remove(node.getRackName());
}
- decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
+ ResourceRequest offRackRequest = requests.get(priority).get(
+ ResourceRequest.ANY);
+ decrementOutstanding(offRackRequest);
+
+ // Update cloned RackLocal and OffRack requests for recovery
+ resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+ resourceRequests.add(cloneResourceRequest(offRackRequest));
}
/**
@@ -324,11 +355,13 @@ public class AppSchedulingInfo {
* @param allocatedContainers
* resources allocated to the application
*/
- synchronized private void allocateOffSwitch(
- SchedulerNode node, Priority priority,
- ResourceRequest offSwitchRequest, Container container) {
+ synchronized private void allocateOffSwitch(SchedulerNode node,
+ Priority priority, ResourceRequest offSwitchRequest, Container container,
+ List<ResourceRequest> resourceRequests) {
// Update future requirements
decrementOutstanding(offSwitchRequest);
+ // Update cloned OffRack requests for recovery
+ resourceRequests.add(cloneResourceRequest(offSwitchRequest));
}
synchronized private void decrementOutstanding(
@@ -377,6 +410,7 @@ public class AppSchedulingInfo {
activeUsersManager = newQueue.getActiveUsersManager();
activeUsersManager.activateApplication(user, applicationId);
this.queue = newQueue;
+ this.queueName = newQueue.getQueueName();
}
synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
@@ -409,4 +443,29 @@ public class AppSchedulingInfo {
// this.requests = appInfo.getRequests();
this.blacklist = appInfo.getBlackList();
}
+
+ public synchronized void recoverContainer(RMContainer rmContainer) {
+ QueueMetrics metrics = queue.getMetrics();
+ if (pending) {
+ // If there was any container to recover, the application was
+ // running from scheduler's POV.
+ pending = false;
+ metrics.runAppAttempt(applicationId, user);
+ }
+
+ // Container is completed. Skip recovering resources.
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
+
+ metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
+ false);
+ }
+
+ public ResourceRequest cloneResourceRequest(ResourceRequest request) {
+ ResourceRequest newRequest = ResourceRequest.newInstance(
+ request.getPriority(), request.getResourceName(),
+ request.getCapability(), 1, request.getRelaxLocality());
+ return newRequest;
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java Tue Aug 19 23:49:39 2014
@@ -26,6 +26,8 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@Evolving
@LimitedPrivate("yarn")
@@ -60,4 +62,13 @@ public interface Queue {
boolean hasAccess(QueueACL acl, UserGroupInformation user);
public ActiveUsersManager getActiveUsersManager();
+
+ /**
+ * Recover the state of the queue for a given container.
+ * @param clusterResource the resource of the cluster
+ * @param schedulerAttempt the application for which the container was allocated
+ * @param rmContainer the container that was recovered.
+ */
+ public void recoverContainer(Resource clusterResource,
+ SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java Tue Aug 19 23:49:39 2014
@@ -34,6 +34,15 @@ import org.apache.hadoop.yarn.server.res
@LimitedPrivate("yarn")
@Evolving
public interface ResourceScheduler extends YarnScheduler, Recoverable {
+
+ /**
+ * Set RMContext for <code>ResourceScheduler</code>.
+ * This method should be called immediately after instantiating
+ * a scheduler once.
+ * @param rmContext created by ResourceManager
+ */
+ void setRMContext(RMContext rmContext);
+
/**
* Re-initialize the <code>ResourceScheduler</code>.
* @param conf configuration
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Tue Aug 19 23:49:39 2014
@@ -23,11 +23,11 @@ import org.apache.hadoop.yarn.server.res
@Private
@Unstable
-public class SchedulerApplication {
+public class SchedulerApplication<T extends SchedulerApplicationAttempt> {
private Queue queue;
private final String user;
- private SchedulerApplicationAttempt currentAttempt;
+ private T currentAttempt;
public SchedulerApplication(Queue queue, String user) {
this.queue = queue;
@@ -46,11 +46,11 @@ public class SchedulerApplication {
return user;
}
- public SchedulerApplicationAttempt getCurrentAppAttempt() {
+ public T getCurrentAppAttempt() {
return currentAttempt;
}
- public void setCurrentAppAttempt(SchedulerApplicationAttempt currentAttempt) {
+ public void setCurrentAppAttempt(T currentAttempt) {
this.currentAttempt = currentAttempt;
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Tue Aug 19 23:49:39 2014
@@ -20,9 +20,11 @@ package org.apache.hadoop.yarn.server.re
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,6 +34,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken;
@@ -46,9 +49,11 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
@@ -76,10 +81,20 @@ public class SchedulerApplicationAttempt
protected final Resource currentReservation = Resource.newInstance(0, 0);
private Resource resourceLimit = Resource.newInstance(0, 0);
protected Resource currentConsumption = Resource.newInstance(0, 0);
+ private Resource amResource;
+ private boolean unmanagedAM = true;
+ private boolean amRunning = false;
protected List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
+ // This pendingRelease is used in work-preserving recovery scenario to keep
+ // track of the AM's outstanding release requests. RM on recovery could
+ // receive the release request form AM before it receives the container status
+ // from NM for recovery. In this case, the to-be-recovered containers reported
+ // by NM should not be recovered.
+ private Set<ContainerId> pendingRelease = null;
+
/**
* Count how many times the application has been given an opportunity
* to schedule a task at each priority. Each time the scheduler
@@ -101,11 +116,23 @@ public class SchedulerApplicationAttempt
public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) {
+ Preconditions.checkNotNull("RMContext should not be null", rmContext);
this.rmContext = rmContext;
this.appSchedulingInfo =
new AppSchedulingInfo(applicationAttemptId, user, queue,
- activeUsersManager);
+ activeUsersManager, rmContext.getEpoch());
this.queue = queue;
+ this.pendingRelease = new HashSet<ContainerId>();
+ if (rmContext.getRMApps() != null &&
+ rmContext.getRMApps()
+ .containsKey(applicationAttemptId.getApplicationId())) {
+ ApplicationSubmissionContext appSubmissionContext =
+ rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
+ .getApplicationSubmissionContext();
+ if (appSubmissionContext != null) {
+ unmanagedAM = appSubmissionContext.getUnmanagedAM();
+ }
+ }
}
/**
@@ -144,6 +171,10 @@ public class SchedulerApplicationAttempt
return appSchedulingInfo.getResourceRequests(priority);
}
+ public Set<ContainerId> getPendingRelease() {
+ return this.pendingRelease;
+ }
+
public int getNewContainerId() {
return appSchedulingInfo.getNewContainerId();
}
@@ -168,6 +199,26 @@ public class SchedulerApplicationAttempt
return appSchedulingInfo.getQueueName();
}
+ public Resource getAMResource() {
+ return amResource;
+ }
+
+ public void setAMResource(Resource amResource) {
+ this.amResource = amResource;
+ }
+
+ public boolean isAmRunning() {
+ return amRunning;
+ }
+
+ public void setAmRunning(boolean bool) {
+ amRunning = bool;
+ }
+
+ public boolean getUnmanagedAM() {
+ return unmanagedAM;
+ }
+
public synchronized RMContainer getRMContainer(ContainerId id) {
return liveContainers.get(id);
}
@@ -202,7 +253,14 @@ public class SchedulerApplicationAttempt
public synchronized void updateResourceRequests(
List<ResourceRequest> requests) {
if (!isStopped) {
- appSchedulingInfo.updateResourceRequests(requests);
+ appSchedulingInfo.updateResourceRequests(requests, false);
+ }
+ }
+
+ public synchronized void recoverResourceRequests(
+ List<ResourceRequest> requests) {
+ if (!isStopped) {
+ appSchedulingInfo.updateResourceRequests(requests, true);
}
}
@@ -378,7 +436,8 @@ public class SchedulerApplicationAttempt
// create container token and NMToken altogether.
container.setContainerToken(rmContext.getContainerTokenSecretManager()
.createContainerToken(container.getId(), container.getNodeId(),
- getUser(), container.getResource()));
+ getUser(), container.getResource(), container.getPriority(),
+ rmContainer.getCreationTime()));
NMToken nmToken =
rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
getApplicationAttemptId(), container);
@@ -499,5 +558,24 @@ public class SchedulerApplicationAttempt
appSchedulingInfo.move(newQueue);
this.queue = newQueue;
- }
+ }
+
+ public synchronized void recoverContainer(RMContainer rmContainer) {
+ // recover app scheduling info
+ appSchedulingInfo.recoverContainer(rmContainer);
+
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
+ LOG.info("SchedulerAttempt " + getApplicationAttemptId()
+ + " is recovering container " + rmContainer.getContainerId());
+ liveContainers.put(rmContainer.getContainerId(), rmContainer);
+ Resources.addTo(currentConsumption, rmContainer.getContainer()
+ .getResource());
+ // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
+ // is called.
+ // newlyAllocatedContainers.add(rmContainer);
+ // schedulingOpportunities
+ // lastScheduledContainer
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java Tue Aug 19 23:49:39 2014
@@ -18,11 +18,26 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
/**
* Represents a YARN Cluster Node from the viewpoint of the scheduler.
@@ -31,59 +46,236 @@ import org.apache.hadoop.yarn.conf.YarnC
@Unstable
public abstract class SchedulerNode {
+ private static final Log LOG = LogFactory.getLog(SchedulerNode.class);
+
+ private Resource availableResource = Resource.newInstance(0, 0);
+ private Resource usedResource = Resource.newInstance(0, 0);
+ private Resource totalResourceCapability;
+ private RMContainer reservedContainer;
+ private volatile int numContainers;
+
+
+ /* set of containers that are allocated containers */
+ private final Map<ContainerId, RMContainer> launchedContainers =
+ new HashMap<ContainerId, RMContainer>();
+
+ private final RMNode rmNode;
+ private final String nodeName;
+
+ public SchedulerNode(RMNode node, boolean usePortForNodeName) {
+ this.rmNode = node;
+ this.availableResource = Resources.clone(node.getTotalCapability());
+ this.totalResourceCapability = Resources.clone(node.getTotalCapability());
+ if (usePortForNodeName) {
+ nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
+ } else {
+ nodeName = rmNode.getHostName();
+ }
+ }
+
+ public RMNode getRMNode() {
+ return this.rmNode;
+ }
+
+ /**
+ * Get the ID of the node which contains both its hostname and port.
+ *
+ * @return the ID of the node
+ */
+ public NodeId getNodeID() {
+ return this.rmNode.getNodeID();
+ }
+
+ public String getHttpAddress() {
+ return this.rmNode.getHttpAddress();
+ }
+
/**
* Get the name of the node for scheduling matching decisions.
* <p/>
- * Typically this is the 'hostname' reported by the node, but it could be
- * configured to be 'hostname:port' reported by the node via the
+ * Typically this is the 'hostname' reported by the node, but it could be
+ * configured to be 'hostname:port' reported by the node via the
* {@link YarnConfiguration#RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME} constant.
* The main usecase of this is Yarn minicluster to be able to differentiate
* node manager instances by their port number.
*
* @return name of the node for scheduling matching decisions.
*/
- public abstract String getNodeName();
-
+ public String getNodeName() {
+ return nodeName;
+ }
+
/**
* Get rackname.
+ *
* @return rackname
*/
- public abstract String getRackName();
-
+ public String getRackName() {
+ return this.rmNode.getRackName();
+ }
+
/**
- * Get used resources on the node.
- * @return used resources on the node
+ * The Scheduler has allocated containers on this node to the given
+ * application.
+ *
+ * @param rmContainer
+ * allocated container
*/
- public abstract Resource getUsedResource();
+ public synchronized void allocateContainer(RMContainer rmContainer) {
+ Container container = rmContainer.getContainer();
+ deductAvailableResource(container.getResource());
+ ++numContainers;
+
+ launchedContainers.put(container.getId(), rmContainer);
+
+ LOG.info("Assigned container " + container.getId() + " of capacity "
+ + container.getResource() + " on host " + rmNode.getNodeAddress()
+ + ", which has " + numContainers + " containers, "
+ + getUsedResource() + " used and " + getAvailableResource()
+ + " available after allocation");
+ }
/**
* Get available resources on the node.
+ *
* @return available resources on the node
*/
- public abstract Resource getAvailableResource();
+ public synchronized Resource getAvailableResource() {
+ return this.availableResource;
+ }
/**
- * Get number of active containers on the node.
- * @return number of active containers on the node
- */
- public abstract int getNumContainers();
-
- /**
- * Apply delta resource on node's available resource.
- * @param deltaResource the delta of resource need to apply to node
+ * Get used resources on the node.
+ *
+ * @return used resources on the node
*/
- public abstract void applyDeltaOnAvailableResource(Resource deltaResource);
+ public synchronized Resource getUsedResource() {
+ return this.usedResource;
+ }
/**
* Get total resources on the node.
+ *
* @return total resources on the node.
*/
- public abstract Resource getTotalResource();
-
+ public Resource getTotalResource() {
+ return this.totalResourceCapability;
+ }
+
+ public synchronized boolean isValidContainer(ContainerId containerId) {
+ if (launchedContainers.containsKey(containerId)) {
+ return true;
+ }
+ return false;
+ }
+
+ private synchronized void updateResource(Container container) {
+ addAvailableResource(container.getResource());
+ --numContainers;
+ }
+
/**
- * Get the ID of the node which contains both its hostname and port.
- * @return the ID of the node
+ * Release an allocated container on this node.
+ *
+ * @param container
+ * container to be released
+ */
+ public synchronized void releaseContainer(Container container) {
+ if (!isValidContainer(container.getId())) {
+ LOG.error("Invalid container released " + container);
+ return;
+ }
+
+ /* remove the containers from the nodemanger */
+ if (null != launchedContainers.remove(container.getId())) {
+ updateResource(container);
+ }
+
+ LOG.info("Released container " + container.getId() + " of capacity "
+ + container.getResource() + " on host " + rmNode.getNodeAddress()
+ + ", which currently has " + numContainers + " containers, "
+ + getUsedResource() + " used and " + getAvailableResource()
+ + " available" + ", release resources=" + true);
+ }
+
+ private synchronized void addAvailableResource(Resource resource) {
+ if (resource == null) {
+ LOG.error("Invalid resource addition of null resource for "
+ + rmNode.getNodeAddress());
+ return;
+ }
+ Resources.addTo(availableResource, resource);
+ Resources.subtractFrom(usedResource, resource);
+ }
+
+ private synchronized void deductAvailableResource(Resource resource) {
+ if (resource == null) {
+ LOG.error("Invalid deduction of null resource for "
+ + rmNode.getNodeAddress());
+ return;
+ }
+ Resources.subtractFrom(availableResource, resource);
+ Resources.addTo(usedResource, resource);
+ }
+
+ /**
+ * Reserve container for the attempt on this node.
+ */
+ public abstract void reserveResource(SchedulerApplicationAttempt attempt,
+ Priority priority, RMContainer container);
+
+ /**
+ * Unreserve resources on this node.
*/
- public abstract NodeId getNodeID();
+ public abstract void unreserveResource(SchedulerApplicationAttempt attempt);
+
+ @Override
+ public String toString() {
+ return "host: " + rmNode.getNodeAddress() + " #containers="
+ + getNumContainers() + " available="
+ + getAvailableResource().getMemory() + " used="
+ + getUsedResource().getMemory();
+ }
+
+ /**
+ * Get number of active containers on the node.
+ *
+ * @return number of active containers on the node
+ */
+ public int getNumContainers() {
+ return numContainers;
+ }
+
+ public synchronized List<RMContainer> getRunningContainers() {
+ return new ArrayList<RMContainer>(launchedContainers.values());
+ }
+
+ public synchronized RMContainer getReservedContainer() {
+ return reservedContainer;
+ }
+
+ protected synchronized void
+ setReservedContainer(RMContainer reservedContainer) {
+ this.reservedContainer = reservedContainer;
+ }
+
+ /**
+ * Apply delta resource on node's available resource.
+ *
+ * @param deltaResource
+ * the delta of resource need to apply to node
+ */
+ public synchronized void
+ applyDeltaOnAvailableResource(Resource deltaResource) {
+ // we can only adjust available resource if total resource is changed.
+ Resources.addTo(this.availableResource, deltaResource);
+ }
+
+ public synchronized void recoverContainer(RMContainer rmContainer) {
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
+ allocateContainer(rmContainer);
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java Tue Aug 19 23:49:39 2014
@@ -153,14 +153,17 @@ public class SchedulerUtils {
* @param rmNode RMNode with new resource view
* @param clusterResource the cluster's resource that need to update
* @param log Scheduler's log for resource change
+ * @return true if the resources have changed
*/
- public static void updateResourceIfChanged(SchedulerNode node,
+ public static boolean updateResourceIfChanged(SchedulerNode node,
RMNode rmNode, Resource clusterResource, Log log) {
+ boolean result = false;
Resource oldAvailableResource = node.getAvailableResource();
Resource newAvailableResource = Resources.subtract(
rmNode.getTotalCapability(), node.getUsedResource());
if (!newAvailableResource.equals(oldAvailableResource)) {
+ result = true;
Resource deltaResource = Resources.subtract(newAvailableResource,
oldAvailableResource);
// Reflect resource change to scheduler node.
@@ -176,6 +179,8 @@ public class SchedulerUtils {
+ " with delta: CPU: " + deltaResource.getMemory() + "core, Memory: "
+ deltaResource.getMemory() +"MB");
}
+
+ return result;
}
/**
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Tue Aug 19 23:49:39 2014
@@ -69,7 +69,15 @@ public interface YarnScheduler extends E
@Public
@Stable
public List<QueueUserACLInfo> getQueueUserAclInfo();
-
+
+ /**
+ * Get the whole resource capacity of the cluster.
+ * @return the whole resource capacity of the cluster.
+ */
+ @LimitedPrivate("yarn")
+ @Unstable
+ public Resource getClusterResource();
+
/**
* Get minimum allocatable {@link Resource}.
* @return minimum allocatable resource
@@ -182,7 +190,7 @@ public interface YarnScheduler extends E
@LimitedPrivate("yarn")
@Unstable
public RMContainer getRMContainer(ContainerId containerId);
-
+
/**
* Moves the given application to the given queue
* @param appId
@@ -194,4 +202,22 @@ public interface YarnScheduler extends E
@Evolving
public String moveApplication(ApplicationId appId, String newQueue)
throws YarnException;
+
+ /**
+ * Completely drain sourceQueue of applications, by moving all of them to
+ * destQueue.
+ *
+ * @param sourceQueue
+ * @param destQueue
+ * @throws YarnException
+ */
+ void moveAllApps(String sourceQueue, String destQueue) throws YarnException;
+
+ /**
+ * Terminate all applications in the specified queue.
+ *
+ * @param queueName the name of queue to be drained
+ * @throws YarnException
+ */
+ void killAllAppsInQueue(String queueName) throws YarnException;
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Tue Aug 19 23:49:39 2014
@@ -28,7 +28,6 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
@@ -235,17 +234,26 @@ extends org.apache.hadoop.yarn.server.re
public ActiveUsersManager getActiveUsersManager();
/**
- * Recover the state of the queue
- * @param clusterResource the resource of the cluster
- * @param application the application for which the container was allocated
- * @param container the container that was recovered.
- */
- public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application,
- Container container);
-
- /**
* Adds all applications in the queue and its subqueues to the given collection.
* @param apps the collection to add the applications to
*/
public void collectSchedulerApplications(Collection<ApplicationAttemptId> apps);
+
+ /**
+ * Detach a container from this queue
+ * @param clusterResource the current cluster resource
+ * @param application application to which the container was assigned
+ * @param container the container to detach
+ */
+ public void detachContainer(Resource clusterResource,
+ FiCaSchedulerApp application, RMContainer container);
+
+ /**
+ * Attach a container to this queue
+ * @param clusterResource the current cluster resource
+ * @param application application to which the container was assigned
+ * @param container the container to attach
+ */
+ public void attachContainer(Resource clusterResource,
+ FiCaSchedulerApp application, RMContainer container);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java Tue Aug 19 23:49:39 2014
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -24,6 +27,8 @@ import org.apache.hadoop.yarn.util.resou
class CSQueueUtils {
+ private static final Log LOG = LogFactory.getLog(CSQueueUtils.class);
+
final static float EPSILON = 0.0001f;
public static void checkMaxCapacity(String queueName,
@@ -113,4 +118,52 @@ class CSQueueUtils {
)
);
}
+
+ public static float getAbsoluteMaxAvailCapacity(
+ ResourceCalculator resourceCalculator, Resource clusterResource, CSQueue queue) {
+ CSQueue parent = queue.getParent();
+ if (parent == null) {
+ return queue.getAbsoluteMaximumCapacity();
+ }
+
+ //Get my parent's max avail, needed to determine my own
+ float parentMaxAvail = getAbsoluteMaxAvailCapacity(
+ resourceCalculator, clusterResource, parent);
+ //...and as a resource
+ Resource parentResource = Resources.multiply(clusterResource, parentMaxAvail);
+
+ //check for no resources parent before dividing, if so, max avail is none
+ if (Resources.isInvalidDivisor(resourceCalculator, parentResource)) {
+ return 0.0f;
+ }
+ //sibling used is parent used - my used...
+ float siblingUsedCapacity = Resources.ratio(
+ resourceCalculator,
+ Resources.subtract(parent.getUsedResources(), queue.getUsedResources()),
+ parentResource);
+ //my max avail is the lesser of my max capacity and what is unused from my parent
+ //by my siblings (if they are beyond their base capacity)
+ float maxAvail = Math.min(
+ queue.getMaximumCapacity(),
+ 1.0f - siblingUsedCapacity);
+ //and, mutiply by parent to get absolute (cluster relative) value
+ float absoluteMaxAvail = maxAvail * parentMaxAvail;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("qpath " + queue.getQueuePath());
+ LOG.debug("parentMaxAvail " + parentMaxAvail);
+ LOG.debug("siblingUsedCapacity " + siblingUsedCapacity);
+ LOG.debug("getAbsoluteMaximumCapacity " + queue.getAbsoluteMaximumCapacity());
+ LOG.debug("maxAvail " + maxAvail);
+ LOG.debug("absoluteMaxAvail " + absoluteMaxAvail);
+ }
+
+ if (absoluteMaxAvail < 0.0f) {
+ absoluteMaxAvail = 0.0f;
+ } else if (absoluteMaxAvail > 1.0f) {
+ absoluteMaxAvail = 1.0f;
+ }
+
+ return absoluteMaxAvail;
+ }
}