You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/12/31 00:42:08 UTC
[3/3] hadoop git commit: YARN-4524. Cleanup AppSchedulingInfo.
(Karthik Kambatla via wangda)
YARN-4524. Cleanup AppSchedulingInfo. (Karthik Kambatla via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/05fa852d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/05fa852d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/05fa852d
Branch: refs/heads/branch-2
Commit: 05fa852d7567b7590d6b53bbf925f8f424736514
Parents: 6eefae1
Author: Wangda Tan <wa...@apache.org>
Authored: Wed Dec 30 15:36:55 2015 -0800
Committer: Wangda Tan <wa...@apache.org>
Committed: Wed Dec 30 15:36:55 2015 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 2 +
.../scheduler/AppSchedulingInfo.java | 379 +++++++++----------
.../scheduler/SchedulerApplicationAttempt.java | 2 +-
3 files changed, 174 insertions(+), 209 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/05fa852d/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index c208c17..146ed62 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -26,6 +26,8 @@ Release 2.9.0 - UNRELEASED
YARN-4522. Queue acl can be checked at app submission. (Jian He via wangda)
+ YARN-4524. Cleanup AppSchedulingInfo. (Karthik Kambatla via wangda)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/05fa852d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index c5f8cd1..41d3fd7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -42,7 +43,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -56,40 +56,36 @@ import org.apache.hadoop.yarn.util.resource.Resources;
public class AppSchedulingInfo {
private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
+ private static final Comparator COMPARATOR =
+ new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator();
+ private static final int EPOCH_BIT_SHIFT = 40;
+
+ private final ApplicationId applicationId;
private final ApplicationAttemptId applicationAttemptId;
- final ApplicationId applicationId;
- private String queueName;
- Queue queue;
- final String user;
- // TODO making containerIdCounter long
private final AtomicLong containerIdCounter;
- private final int EPOCH_BIT_SHIFT = 40;
+ private final String user;
+
+ private Queue queue;
+ private ActiveUsersManager activeUsersManager;
+ private boolean pending = true; // whether accepted/allocated by scheduler
+ private ResourceUsage appResourceUsage;
+
+ private final Set<String> amBlacklist = new HashSet<>();
+ private Set<String> userBlacklist = new HashSet<>();
- final Set<Priority> priorities = new TreeSet<Priority>(
- new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
+ final Set<Priority> priorities = new TreeSet<>(COMPARATOR);
final Map<Priority, Map<String, ResourceRequest>> resourceRequestMap =
- new ConcurrentHashMap<Priority, Map<String, ResourceRequest>>();
- final Map<NodeId, Map<Priority, Map<ContainerId,
- SchedContainerChangeRequest>>> increaseRequestMap =
new ConcurrentHashMap<>();
- private Set<String> userBlacklist = new HashSet<>();
- private Set<String> amBlacklist = new HashSet<>();
+ final Map<NodeId, Map<Priority, Map<ContainerId,
+ SchedContainerChangeRequest>>> containerIncreaseRequestMap =
+ new ConcurrentHashMap<>();
- //private final ApplicationStore store;
- private ActiveUsersManager activeUsersManager;
-
- /* Allocated by scheduler */
- boolean pending = true; // for app metrics
-
- private ResourceUsage appResourceUsage;
-
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
long epoch, ResourceUsage appResourceUsage) {
this.applicationAttemptId = appAttemptId;
this.applicationId = appAttemptId.getApplicationId();
this.queue = queue;
- this.queueName = queue.getQueueName();
this.user = user;
this.activeUsersManager = activeUsersManager;
this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT);
@@ -104,14 +100,18 @@ public class AppSchedulingInfo {
return applicationAttemptId;
}
- public String getQueueName() {
- return queueName;
- }
-
public String getUser() {
return user;
}
+ public long getNewContainerId() {
+ return this.containerIdCounter.incrementAndGet();
+ }
+
+ public synchronized String getQueueName() {
+ return queue.getQueueName();
+ }
+
public synchronized boolean isPending() {
return pending;
}
@@ -125,30 +125,23 @@ public class AppSchedulingInfo {
LOG.info("Application " + applicationId + " requests cleared");
}
- public long getNewContainerId() {
- return this.containerIdCounter.incrementAndGet();
- }
-
- public boolean hasIncreaseRequest(NodeId nodeId) {
+ public synchronized boolean hasIncreaseRequest(NodeId nodeId) {
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
- increaseRequestMap.get(nodeId);
- if (null == requestsOnNode) {
- return false;
- }
- return requestsOnNode.size() > 0;
+ containerIncreaseRequestMap.get(nodeId);
+ return requestsOnNode == null ? false : requestsOnNode.size() > 0;
}
- public Map<ContainerId, SchedContainerChangeRequest>
+ public synchronized Map<ContainerId, SchedContainerChangeRequest>
getIncreaseRequests(NodeId nodeId, Priority priority) {
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
- increaseRequestMap.get(nodeId);
- if (null == requestsOnNode) {
- return null;
- }
-
- return requestsOnNode.get(priority);
+ containerIncreaseRequestMap.get(nodeId);
+ return requestsOnNode == null ? null : requestsOnNode.get(priority);
}
+ /**
+ * return true if any of the existing increase requests are updated,
+ * false if none of them are updated
+ */
public synchronized boolean updateIncreaseRequests(
List<SchedContainerChangeRequest> increaseRequests) {
boolean resourceUpdated = false;
@@ -157,10 +150,10 @@ public class AppSchedulingInfo {
NodeId nodeId = r.getRMContainer().getAllocatedNode();
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
- increaseRequestMap.get(nodeId);
+ containerIncreaseRequestMap.get(nodeId);
if (null == requestsOnNode) {
requestsOnNode = new TreeMap<>();
- increaseRequestMap.put(nodeId, requestsOnNode);
+ containerIncreaseRequestMap.put(nodeId, requestsOnNode);
}
SchedContainerChangeRequest prevChangeRequest =
@@ -168,22 +161,21 @@ public class AppSchedulingInfo {
if (null != prevChangeRequest) {
if (Resources.equals(prevChangeRequest.getTargetCapacity(),
r.getTargetCapacity())) {
- // New target capacity is as same as what we have, just ignore the new
- // one
+ // increase request hasn't changed
continue;
}
- // remove the old one
+ // remove the old one, as we will use the new one going forward
removeIncreaseRequest(nodeId, prevChangeRequest.getPriority(),
prevChangeRequest.getContainerId());
}
- if (Resources.equals(r.getTargetCapacity(), r.getRMContainer().getAllocatedResource())) {
+ if (Resources.equals(r.getTargetCapacity(),
+ r.getRMContainer().getAllocatedResource())) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to increase/decrease container, "
- + "target capacity = previous capacity = " + prevChangeRequest
- + " for container=" + r.getContainerId()
- + ". Will ignore this increase request");
+ LOG.debug("Trying to increase container " + r.getContainerId()
+ + ", target capacity = previous capacity = " + prevChangeRequest
+ + ". Will ignore this increase request.");
}
continue;
}
@@ -195,25 +187,26 @@ public class AppSchedulingInfo {
return resourceUpdated;
}
- // insert increase request and add missing hierarchy if missing
+ /**
+ * Insert increase request, adding any missing items in the data-structure
+ * hierarchy.
+ */
private void insertIncreaseRequest(SchedContainerChangeRequest request) {
NodeId nodeId = request.getNodeId();
Priority priority = request.getPriority();
ContainerId containerId = request.getContainerId();
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
- increaseRequestMap.get(nodeId);
+ containerIncreaseRequestMap.get(nodeId);
if (null == requestsOnNode) {
- requestsOnNode =
- new HashMap<Priority, Map<ContainerId, SchedContainerChangeRequest>>();
- increaseRequestMap.put(nodeId, requestsOnNode);
+ requestsOnNode = new HashMap<>();
+ containerIncreaseRequestMap.put(nodeId, requestsOnNode);
}
Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
requestsOnNode.get(priority);
if (null == requestsOnNodeWithPriority) {
- requestsOnNodeWithPriority =
- new TreeMap<ContainerId, SchedContainerChangeRequest>();
+ requestsOnNodeWithPriority = new TreeMap<>();
requestsOnNode.put(priority, requestsOnNodeWithPriority);
}
@@ -237,7 +230,7 @@ public class AppSchedulingInfo {
public synchronized boolean removeIncreaseRequest(NodeId nodeId, Priority priority,
ContainerId containerId) {
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
- increaseRequestMap.get(nodeId);
+ containerIncreaseRequestMap.get(nodeId);
if (null == requestsOnNode) {
return false;
}
@@ -256,7 +249,7 @@ public class AppSchedulingInfo {
requestsOnNode.remove(priority);
}
if (requestsOnNode.isEmpty()) {
- increaseRequestMap.remove(nodeId);
+ containerIncreaseRequestMap.remove(nodeId);
}
if (request == null) {
@@ -279,18 +272,15 @@ public class AppSchedulingInfo {
public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
Priority priority, ContainerId containerId) {
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
- increaseRequestMap.get(nodeId);
+ containerIncreaseRequestMap.get(nodeId);
if (null == requestsOnNode) {
return null;
}
Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
requestsOnNode.get(priority);
- if (null == requestsOnNodeWithPriority) {
- return null;
- }
-
- return requestsOnNodeWithPriority.get(containerId);
+ return requestsOnNodeWithPriority == null ? null
+ : requestsOnNodeWithPriority.get(containerId);
}
/**
@@ -299,121 +289,120 @@ public class AppSchedulingInfo {
* by the application.
*
* @param requests resources to be acquired
- * @param recoverPreemptedRequest recover Resource Request on preemption
- * @return true if any resource was updated, false else
+ * @param recoverPreemptedRequest recover ResourceRequest on preemption
+ * @return true if any resource was updated, false otherwise
*/
- synchronized public boolean updateResourceRequests(
+ public synchronized boolean updateResourceRequests(
List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
- QueueMetrics metrics = queue.getMetrics();
-
+ // Flag to track if any incoming requests update "ANY" requests
boolean anyResourcesUpdated = false;
// Update resource requests
for (ResourceRequest request : requests) {
Priority priority = request.getPriority();
String resourceName = request.getResourceName();
- boolean updatePendingResources = false;
- ResourceRequest lastRequest = null;
- if (resourceName.equals(ResourceRequest.ANY)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("update:" + " application=" + applicationId + " request="
- + request);
- }
- updatePendingResources = true;
- anyResourcesUpdated = true;
-
- // Premature optimization?
- // Assumes that we won't see more than one priority request updated
- // in one call, reasonable assumption... however, it's totally safe
- // to activate same application more than once.
- // Thus we don't need another loop ala the one in decrementOutstanding()
- // which is needed during deactivate.
- if (request.getNumContainers() > 0) {
- activeUsersManager.activateApplication(user, applicationId);
- }
- ResourceRequest previousAnyRequest =
- getResourceRequest(priority, resourceName);
-
- // When there is change in ANY request label expression, we should
- // update label for all resource requests already added of same
- // priority as ANY resource request.
- if ((null == previousAnyRequest)
- || isRequestLabelChanged(previousAnyRequest, request)) {
- Map<String, ResourceRequest> resourceRequest =
- getResourceRequests(priority);
- if (resourceRequest != null) {
- for (ResourceRequest r : resourceRequest.values()) {
- if (!r.getResourceName().equals(ResourceRequest.ANY)) {
- r.setNodeLabelExpression(request.getNodeLabelExpression());
- }
- }
- }
- }
- } else {
- ResourceRequest anyRequest =
- getResourceRequest(priority, ResourceRequest.ANY);
- if (anyRequest != null) {
- request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
- }
- }
+ // Update node labels if required
+ updateNodeLabels(request);
Map<String, ResourceRequest> asks = this.resourceRequestMap.get(priority);
-
if (asks == null) {
- asks = new ConcurrentHashMap<String, ResourceRequest>();
+ asks = new ConcurrentHashMap<>();
this.resourceRequestMap.put(priority, asks);
this.priorities.add(priority);
}
- lastRequest = asks.get(resourceName);
+ // Increment number of containers if recovering preempted resources
+ ResourceRequest lastRequest = asks.get(resourceName);
if (recoverPreemptedRequest && lastRequest != null) {
- // Increment the number of containers to 1, as it is recovering a
- // single container.
request.setNumContainers(lastRequest.getNumContainers() + 1);
}
+ // Update asks
asks.put(resourceName, request);
- if (updatePendingResources) {
-
- // Similarly, deactivate application?
- if (request.getNumContainers() <= 0) {
- LOG.info("checking for deactivate of application :"
- + this.applicationId);
- checkForDeactivation();
- }
-
- int lastRequestContainers = lastRequest != null ? lastRequest
- .getNumContainers() : 0;
- Resource lastRequestCapability = lastRequest != null ? lastRequest
- .getCapability() : Resources.none();
- metrics.incrPendingResources(user, request.getNumContainers(),
- request.getCapability());
- metrics.decrPendingResources(user, lastRequestContainers,
- lastRequestCapability);
-
- // update queue:
- Resource increasedResource =
- Resources.multiply(request.getCapability(),
- request.getNumContainers());
- queue.incPendingResource(request.getNodeLabelExpression(),
- increasedResource);
- appResourceUsage.incPending(request.getNodeLabelExpression(),
- increasedResource);
- if (lastRequest != null) {
- Resource decreasedResource =
- Resources.multiply(lastRequestCapability, lastRequestContainers);
- queue.decPendingResource(lastRequest.getNodeLabelExpression(),
- decreasedResource);
- appResourceUsage.decPending(lastRequest.getNodeLabelExpression(),
- decreasedResource);
+
+ if (resourceName.equals(ResourceRequest.ANY)) {
+ anyResourcesUpdated = true;
+
+ // Activate application. Metrics activation is done here.
+ // TODO: Shouldn't we activate even if numContainers = 0?
+ if (request.getNumContainers() > 0) {
+ activeUsersManager.activateApplication(user, applicationId);
}
+
+ // Update pendingResources
+ updatePendingResources(lastRequest, request, queue.getMetrics());
}
}
return anyResourcesUpdated;
}
- private boolean isRequestLabelChanged(ResourceRequest requestOne,
+ private void updatePendingResources(ResourceRequest lastRequest,
+ ResourceRequest request, QueueMetrics metrics) {
+ if (request.getNumContainers() <= 0) {
+ LOG.info("checking for deactivate of application :"
+ + this.applicationId);
+ checkForDeactivation();
+ }
+
+ int lastRequestContainers =
+ (lastRequest != null) ? lastRequest.getNumContainers() : 0;
+ Resource lastRequestCapability =
+ lastRequest != null ? lastRequest.getCapability() : Resources.none();
+ metrics.incrPendingResources(user,
+ request.getNumContainers(), request.getCapability());
+ metrics.decrPendingResources(user,
+ lastRequestContainers, lastRequestCapability);
+
+ // update queue:
+ Resource increasedResource =
+ Resources.multiply(request.getCapability(), request.getNumContainers());
+ queue.incPendingResource(request.getNodeLabelExpression(),
+ increasedResource);
+ appResourceUsage.incPending(request.getNodeLabelExpression(),
+ increasedResource);
+ if (lastRequest != null) {
+ Resource decreasedResource =
+ Resources.multiply(lastRequestCapability, lastRequestContainers);
+ queue.decPendingResource(lastRequest.getNodeLabelExpression(),
+ decreasedResource);
+ appResourceUsage.decPending(lastRequest.getNodeLabelExpression(),
+ decreasedResource);
+ }
+ }
+
+ private void updateNodeLabels(ResourceRequest request) {
+ Priority priority = request.getPriority();
+ String resourceName = request.getResourceName();
+ if (resourceName.equals(ResourceRequest.ANY)) {
+ ResourceRequest previousAnyRequest =
+ getResourceRequest(priority, resourceName);
+
+ // When there is change in ANY request label expression, we should
+ // update label for all resource requests already added of same
+ // priority as ANY resource request.
+ if ((null == previousAnyRequest)
+ || hasRequestLabelChanged(previousAnyRequest, request)) {
+ Map<String, ResourceRequest> resourceRequest =
+ getResourceRequests(priority);
+ if (resourceRequest != null) {
+ for (ResourceRequest r : resourceRequest.values()) {
+ if (!r.getResourceName().equals(ResourceRequest.ANY)) {
+ r.setNodeLabelExpression(request.getNodeLabelExpression());
+ }
+ }
+ }
+ }
+ } else {
+ ResourceRequest anyRequest =
+ getResourceRequest(priority, ResourceRequest.ANY);
+ if (anyRequest != null) {
+ request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
+ }
+ }
+ }
+
+ private boolean hasRequestLabelChanged(ResourceRequest requestOne,
ResourceRequest requestTwo) {
String requestOneLabelExp = requestOne.getNodeLabelExpression();
String requestTwoLabelExp = requestTwo.getNodeLabelExpression();
@@ -465,24 +454,24 @@ public class AppSchedulingInfo {
}
}
- synchronized public Collection<Priority> getPriorities() {
+ public synchronized Collection<Priority> getPriorities() {
return priorities;
}
- synchronized public Map<String, ResourceRequest> getResourceRequests(
+ public synchronized Map<String, ResourceRequest> getResourceRequests(
Priority priority) {
return resourceRequestMap.get(priority);
}
- public List<ResourceRequest> getAllResourceRequests() {
- List<ResourceRequest> ret = new ArrayList<ResourceRequest>();
+ public synchronized List<ResourceRequest> getAllResourceRequests() {
+ List<ResourceRequest> ret = new ArrayList<>();
for (Map<String, ResourceRequest> r : resourceRequestMap.values()) {
ret.addAll(r.values());
}
return ret;
}
- synchronized public ResourceRequest getResourceRequest(Priority priority,
+ public synchronized ResourceRequest getResourceRequest(Priority priority,
String resourceName) {
Map<String, ResourceRequest> nodeRequests = resourceRequestMap.get(priority);
return (nodeRequests == null) ? null : nodeRequests.get(resourceName);
@@ -511,7 +500,7 @@ public class AppSchedulingInfo {
}
}
}
-
+
public synchronized void increaseContainer(
SchedContainerChangeRequest increaseRequest) {
NodeId nodeId = increaseRequest.getNodeId();
@@ -559,28 +548,17 @@ public class AppSchedulingInfo {
/**
* Resources have been allocated to this application by the resource
* scheduler. Track them.
- *
- * @param type
- * the type of the node
- * @param node
- * the nodeinfo of the node
- * @param priority
- * the priority of the request.
- * @param request
- * the request
- * @param container
- * the containers allocated.
*/
- synchronized public List<ResourceRequest> allocate(NodeType type,
+ public synchronized List<ResourceRequest> allocate(NodeType type,
SchedulerNode node, Priority priority, ResourceRequest request,
- Container container) {
- List<ResourceRequest> resourceRequests = new ArrayList<ResourceRequest>();
+ Container containerAllocated) {
+ List<ResourceRequest> resourceRequests = new ArrayList<>();
if (type == NodeType.NODE_LOCAL) {
- allocateNodeLocal(node, priority, request, container, resourceRequests);
+ allocateNodeLocal(node, priority, request, resourceRequests);
} else if (type == NodeType.RACK_LOCAL) {
- allocateRackLocal(node, priority, request, container, resourceRequests);
+ allocateRackLocal(node, priority, request, resourceRequests);
} else {
- allocateOffSwitch(node, priority, request, container, resourceRequests);
+ allocateOffSwitch(request, resourceRequests);
}
QueueMetrics metrics = queue.getMetrics();
if (pending) {
@@ -592,8 +570,8 @@ public class AppSchedulingInfo {
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationId=" + applicationId
- + " container=" + container.getId()
- + " host=" + container.getNodeId().toString()
+ + " container=" + containerAllocated.getId()
+ + " host=" + containerAllocated.getNodeId().toString()
+ " user=" + user
+ " resource=" + request.getCapability()
+ " type=" + type);
@@ -606,12 +584,9 @@ public class AppSchedulingInfo {
/**
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
- *
- * @param allocatedContainers
- * resources allocated to the application
*/
- synchronized private void allocateNodeLocal(SchedulerNode node,
- Priority priority, ResourceRequest nodeLocalRequest, Container container,
+ private synchronized void allocateNodeLocal(SchedulerNode node,
+ Priority priority, ResourceRequest nodeLocalRequest,
List<ResourceRequest> resourceRequests) {
// Update future requirements
decResourceRequest(node.getNodeName(), priority, nodeLocalRequest);
@@ -641,12 +616,9 @@ public class AppSchedulingInfo {
/**
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
- *
- * @param allocatedContainers
- * resources allocated to the application
*/
- synchronized private void allocateRackLocal(SchedulerNode node,
- Priority priority, ResourceRequest rackLocalRequest, Container container,
+ private synchronized void allocateRackLocal(SchedulerNode node,
+ Priority priority, ResourceRequest rackLocalRequest,
List<ResourceRequest> resourceRequests) {
// Update future requirements
decResourceRequest(node.getRackName(), priority, rackLocalRequest);
@@ -663,20 +635,16 @@ public class AppSchedulingInfo {
/**
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
- *
- * @param allocatedContainers
- * resources allocated to the application
*/
- synchronized private void allocateOffSwitch(SchedulerNode node,
- Priority priority, ResourceRequest offSwitchRequest, Container container,
- List<ResourceRequest> resourceRequests) {
+ private synchronized void allocateOffSwitch(
+ ResourceRequest offSwitchRequest, List<ResourceRequest> resourceRequests) {
// Update future requirements
decrementOutstanding(offSwitchRequest);
// Update cloned OffRack requests for recovery
resourceRequests.add(cloneResourceRequest(offSwitchRequest));
}
- synchronized private void decrementOutstanding(
+ private synchronized void decrementOutstanding(
ResourceRequest offSwitchRequest) {
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
@@ -695,7 +663,7 @@ public class AppSchedulingInfo {
offSwitchRequest.getCapability());
}
- synchronized private void checkForDeactivation() {
+ private synchronized void checkForDeactivation() {
boolean deactivate = true;
for (Priority priority : getPriorities()) {
ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY);
@@ -709,7 +677,7 @@ public class AppSchedulingInfo {
// also we need to check increase request
if (!deactivate) {
- deactivate = increaseRequestMap.isEmpty();
+ deactivate = containerIncreaseRequestMap.isEmpty();
}
if (deactivate) {
@@ -717,7 +685,7 @@ public class AppSchedulingInfo {
}
}
- synchronized public void move(Queue newQueue) {
+ public synchronized void move(Queue newQueue) {
QueueMetrics oldMetrics = queue.getMetrics();
QueueMetrics newMetrics = newQueue.getMetrics();
for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
@@ -741,10 +709,9 @@ public class AppSchedulingInfo {
activeUsersManager = newQueue.getActiveUsersManager();
activeUsersManager.activateApplication(user, applicationId);
this.queue = newQueue;
- this.queueName = newQueue.getQueueName();
}
- synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
+ public synchronized void stop() {
// clear pending resources metrics for the application
QueueMetrics metrics = queue.getMetrics();
for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
@@ -782,12 +749,8 @@ public class AppSchedulingInfo {
public synchronized void transferStateFromPreviousAppSchedulingInfo(
AppSchedulingInfo appInfo) {
- // this.priorities = appInfo.getPriorities();
- // this.requests = appInfo.getRequests();
// This should not require locking the userBlacklist since it will not be
// used by this instance until after setCurrentAppAttempt.
- // Should cleanup this to avoid sharing between instances and can
- // then remove getBlacklist as well.
this.userBlacklist = appInfo.getBlackList();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/05fa852d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 09f3598..4d81350 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -331,7 +331,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
// Cleanup all scheduling information
isStopped = true;
- appSchedulingInfo.stop(rmAppAttemptFinalState);
+ appSchedulingInfo.stop();
}
public synchronized boolean isStopped() {