You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2016/10/03 22:58:24 UTC
[11/57] [abbrv] hadoop git commit: YARN-3142. Improve locks in
AppSchedulingInfo. (Varun Saxena via wangda)
YARN-3142. Improve locks in AppSchedulingInfo. (Varun Saxena via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1831be8e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1831be8e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1831be8e
Branch: refs/heads/HDFS-10467
Commit: 1831be8e737fd423a9f3d590767b944147e85641
Parents: 875062b
Author: Wangda Tan <wa...@apache.org>
Authored: Tue Sep 27 11:54:55 2016 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Tue Sep 27 11:54:55 2016 -0700
----------------------------------------------------------------------
.../scheduler/AppSchedulingInfo.java | 619 +++++++++++--------
1 file changed, 356 insertions(+), 263 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1831be8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 39820f7..59a6650 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -67,7 +68,8 @@ public class AppSchedulingInfo {
private Queue queue;
private ActiveUsersManager activeUsersManager;
- private boolean pending = true; // whether accepted/allocated by scheduler
+ // whether accepted/allocated by scheduler
+ private volatile boolean pending = true;
private ResourceUsage appResourceUsage;
private AtomicBoolean userBlacklistChanged = new AtomicBoolean(false);
@@ -86,6 +88,9 @@ public class AppSchedulingInfo {
SchedContainerChangeRequest>>> containerIncreaseRequestMap =
new ConcurrentHashMap<>();
+ private final ReentrantReadWriteLock.ReadLock readLock;
+ private final ReentrantReadWriteLock.WriteLock writeLock;
+
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
long epoch, ResourceUsage appResourceUsage) {
@@ -97,6 +102,10 @@ public class AppSchedulingInfo {
this.containerIdCounter = new AtomicLong(
epoch << ResourceManager.EPOCH_BIT_SHIFT);
this.appResourceUsage = appResourceUsage;
+
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ readLock = lock.readLock();
+ writeLock = lock.writeLock();
}
public ApplicationId getApplicationId() {
@@ -115,14 +124,19 @@ public class AppSchedulingInfo {
return this.containerIdCounter.incrementAndGet();
}
- public synchronized String getQueueName() {
- return queue.getQueueName();
+ public String getQueueName() {
+ try {
+ this.readLock.lock();
+ return queue.getQueueName();
+ } finally {
+ this.readLock.unlock();
+ }
}
- public synchronized boolean isPending() {
+ public boolean isPending() {
return pending;
}
-
+
public Set<String> getRequestedPartitions() {
return requestedPartitions;
}
@@ -130,88 +144,103 @@ public class AppSchedulingInfo {
/**
* Clear any pending requests from this application.
*/
- private synchronized void clearRequests() {
+ private void clearRequests() {
schedulerKeys.clear();
resourceRequestMap.clear();
LOG.info("Application " + applicationId + " requests cleared");
}
- public synchronized boolean hasIncreaseRequest(NodeId nodeId) {
- Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
- requestsOnNode = containerIncreaseRequestMap.get(nodeId);
- return requestsOnNode == null ? false : requestsOnNode.size() > 0;
+ public boolean hasIncreaseRequest(NodeId nodeId) {
+ try {
+ this.readLock.lock();
+ Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
+ requestsOnNode = containerIncreaseRequestMap.get(nodeId);
+ return requestsOnNode == null ? false : requestsOnNode.size() > 0;
+ } finally {
+ this.readLock.unlock();
+ }
}
- public synchronized Map<ContainerId, SchedContainerChangeRequest>
+ public Map<ContainerId, SchedContainerChangeRequest>
getIncreaseRequests(NodeId nodeId, SchedulerRequestKey schedulerKey) {
- Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
- requestsOnNode = containerIncreaseRequestMap.get(nodeId);
- return requestsOnNode == null ? null : requestsOnNode.get(
- schedulerKey);
+ try {
+ this.readLock.lock();
+ Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
+ requestsOnNode = containerIncreaseRequestMap.get(nodeId);
+ return requestsOnNode == null ? null : requestsOnNode.get(
+ schedulerKey);
+ } finally {
+ this.readLock.unlock();
+ }
}
/**
* return true if any of the existing increase requests are updated,
* false if none of them are updated
*/
- public synchronized boolean updateIncreaseRequests(
+ public boolean updateIncreaseRequests(
List<SchedContainerChangeRequest> increaseRequests) {
boolean resourceUpdated = false;
- for (SchedContainerChangeRequest r : increaseRequests) {
- if (r.getRMContainer().getState() != RMContainerState.RUNNING) {
- LOG.warn("rmContainer's state is not RUNNING, for increase request with"
- + " container-id=" + r.getContainerId());
- continue;
- }
- try {
- RMServerUtils.checkSchedContainerChangeRequest(r, true);
- } catch (YarnException e) {
- LOG.warn("Error happens when checking increase request, Ignoring.."
- + " exception=", e);
- continue;
- }
- NodeId nodeId = r.getRMContainer().getAllocatedNode();
-
- Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
- requestsOnNode = containerIncreaseRequestMap.get(nodeId);
- if (null == requestsOnNode) {
- requestsOnNode = new TreeMap<>();
- containerIncreaseRequestMap.put(nodeId, requestsOnNode);
- }
-
- SchedContainerChangeRequest prevChangeRequest =
- getIncreaseRequest(nodeId,
- r.getRMContainer().getAllocatedSchedulerKey(),
- r.getContainerId());
- if (null != prevChangeRequest) {
- if (Resources.equals(prevChangeRequest.getTargetCapacity(),
- r.getTargetCapacity())) {
- // increase request hasn't changed
+ try {
+ this.writeLock.lock();
+ for (SchedContainerChangeRequest r : increaseRequests) {
+ if (r.getRMContainer().getState() != RMContainerState.RUNNING) {
+ LOG.warn("rmContainer's state is not RUNNING, for increase request"
+ + " with container-id=" + r.getContainerId());
continue;
}
+ try {
+ RMServerUtils.checkSchedContainerChangeRequest(r, true);
+ } catch (YarnException e) {
+ LOG.warn("Error happens when checking increase request, Ignoring.."
+ + " exception=", e);
+ continue;
+ }
+ NodeId nodeId = r.getRMContainer().getAllocatedNode();
- // remove the old one, as we will use the new one going forward
- removeIncreaseRequest(nodeId,
- prevChangeRequest.getRMContainer().getAllocatedSchedulerKey(),
- prevChangeRequest.getContainerId());
- }
+ Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
+ requestsOnNode = containerIncreaseRequestMap.get(nodeId);
+ if (null == requestsOnNode) {
+ requestsOnNode = new TreeMap<>();
+ containerIncreaseRequestMap.put(nodeId, requestsOnNode);
+ }
- if (Resources.equals(r.getTargetCapacity(),
- r.getRMContainer().getAllocatedResource())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to increase container " + r.getContainerId()
- + ", target capacity = previous capacity = " + prevChangeRequest
- + ". Will ignore this increase request.");
+ SchedContainerChangeRequest prevChangeRequest =
+ getIncreaseRequest(nodeId,
+ r.getRMContainer().getAllocatedSchedulerKey(),
+ r.getContainerId());
+ if (null != prevChangeRequest) {
+ if (Resources.equals(prevChangeRequest.getTargetCapacity(),
+ r.getTargetCapacity())) {
+ // increase request hasn't changed
+ continue;
+ }
+
+ // remove the old one, as we will use the new one going forward
+ removeIncreaseRequest(nodeId,
+ prevChangeRequest.getRMContainer().getAllocatedSchedulerKey(),
+ prevChangeRequest.getContainerId());
}
- continue;
- }
- // add the new one
- resourceUpdated = true;
- insertIncreaseRequest(r);
+ if (Resources.equals(r.getTargetCapacity(),
+ r.getRMContainer().getAllocatedResource())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to increase container " + r.getContainerId()
+ + ", target capacity = previous capacity = " + prevChangeRequest
+ + ". Will ignore this increase request.");
+ }
+ continue;
+ }
+
+ // add the new one
+ resourceUpdated = true;
+ insertIncreaseRequest(r);
+ }
+ return resourceUpdated;
+ } finally {
+ this.writeLock.unlock();
}
- return resourceUpdated;
}
/**
@@ -275,61 +304,71 @@ public class AppSchedulingInfo {
}
}
- public synchronized boolean removeIncreaseRequest(NodeId nodeId,
+ public boolean removeIncreaseRequest(NodeId nodeId,
SchedulerRequestKey schedulerKey, ContainerId containerId) {
- Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
- requestsOnNode = containerIncreaseRequestMap.get(nodeId);
- if (null == requestsOnNode) {
- return false;
- }
+ try {
+ this.writeLock.lock();
+ Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
+ requestsOnNode = containerIncreaseRequestMap.get(nodeId);
+ if (null == requestsOnNode) {
+ return false;
+ }
- Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
- requestsOnNode.get(schedulerKey);
- if (null == requestsOnNodeWithPriority) {
- return false;
- }
+ Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
+ requestsOnNode.get(schedulerKey);
+ if (null == requestsOnNodeWithPriority) {
+ return false;
+ }
- SchedContainerChangeRequest request =
- requestsOnNodeWithPriority.remove(containerId);
-
- // remove hierarchies if it becomes empty
- if (requestsOnNodeWithPriority.isEmpty()) {
- requestsOnNode.remove(schedulerKey);
- decrementSchedulerKeyReference(schedulerKey);
- }
- if (requestsOnNode.isEmpty()) {
- containerIncreaseRequestMap.remove(nodeId);
- }
+ SchedContainerChangeRequest request =
+ requestsOnNodeWithPriority.remove(containerId);
- if (request == null) {
- return false;
- }
+ // remove hierarchies if it becomes empty
+ if (requestsOnNodeWithPriority.isEmpty()) {
+ requestsOnNode.remove(schedulerKey);
+ decrementSchedulerKeyReference(schedulerKey);
+ }
+ if (requestsOnNode.isEmpty()) {
+ containerIncreaseRequestMap.remove(nodeId);
+ }
- // update queue's pending resource if request exists
- String partition = request.getRMContainer().getNodeLabelExpression();
- Resource delta = request.getDeltaCapacity();
- appResourceUsage.decPending(partition, delta);
- queue.decPendingResource(partition, delta);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("remove increase request:" + request);
+ if (request == null) {
+ return false;
+ }
+
+ // update queue's pending resource if request exists
+ String partition = request.getRMContainer().getNodeLabelExpression();
+ Resource delta = request.getDeltaCapacity();
+ appResourceUsage.decPending(partition, delta);
+ queue.decPendingResource(partition, delta);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("remove increase request:" + request);
+ }
+
+ return true;
+ } finally {
+ this.writeLock.unlock();
}
-
- return true;
}
public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
SchedulerRequestKey schedulerKey, ContainerId containerId) {
- Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
- requestsOnNode = containerIncreaseRequestMap.get(nodeId);
- if (null == requestsOnNode) {
- return null;
- }
+ try {
+ this.readLock.lock();
+ Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
+ requestsOnNode = containerIncreaseRequestMap.get(nodeId);
+ if (null == requestsOnNode) {
+ return null;
+ }
- Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
- requestsOnNode.get(schedulerKey);
- return requestsOnNodeWithPriority == null ? null
- : requestsOnNodeWithPriority.get(containerId);
+ Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
+ requestsOnNode.get(schedulerKey);
+ return requestsOnNodeWithPriority == null ? null
+ : requestsOnNodeWithPriority.get(containerId);
+ } finally {
+ this.readLock.unlock();
+ }
}
/**
@@ -343,49 +382,54 @@ public class AppSchedulingInfo {
* recover ResourceRequest on preemption
* @return true if any resource was updated, false otherwise
*/
- public synchronized boolean updateResourceRequests(
- List<ResourceRequest> requests,
+ public boolean updateResourceRequests(List<ResourceRequest> requests,
boolean recoverPreemptedRequestForAContainer) {
// Flag to track if any incoming requests update "ANY" requests
boolean anyResourcesUpdated = false;
- // Update resource requests
- for (ResourceRequest request : requests) {
- SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
- String resourceName = request.getResourceName();
-
- // Update node labels if required
- updateNodeLabels(request);
-
- Map<String, ResourceRequest> asks =
- this.resourceRequestMap.get(schedulerKey);
- if (asks == null) {
- asks = new ConcurrentHashMap<>();
- this.resourceRequestMap.put(schedulerKey, asks);
- }
+ try {
+ this.writeLock.lock();
+ // Update resource requests
+ for (ResourceRequest request : requests) {
+ SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
+ String resourceName = request.getResourceName();
+
+ // Update node labels if required
+ updateNodeLabels(request);
+
+ Map<String, ResourceRequest> asks =
+ this.resourceRequestMap.get(schedulerKey);
+ if (asks == null) {
+ asks = new ConcurrentHashMap<>();
+ this.resourceRequestMap.put(schedulerKey, asks);
+ }
- // Increment number of containers if recovering preempted resources
- ResourceRequest lastRequest = asks.get(resourceName);
- if (recoverPreemptedRequestForAContainer && lastRequest != null) {
- request.setNumContainers(lastRequest.getNumContainers() + 1);
- }
+ // Increment number of containers if recovering preempted resources
+ ResourceRequest lastRequest = asks.get(resourceName);
+ if (recoverPreemptedRequestForAContainer && lastRequest != null) {
+ request.setNumContainers(lastRequest.getNumContainers() + 1);
+ }
- // Update asks
- asks.put(resourceName, request);
+ // Update asks
+ asks.put(resourceName, request);
- if (resourceName.equals(ResourceRequest.ANY)) {
- //update the applications requested labels set
- requestedPartitions.add(request.getNodeLabelExpression() == null
- ? RMNodeLabelsManager.NO_LABEL : request.getNodeLabelExpression());
+ if (resourceName.equals(ResourceRequest.ANY)) {
+ //update the applications requested labels set
+ requestedPartitions.add(request.getNodeLabelExpression() == null
+ ? RMNodeLabelsManager.NO_LABEL :
+ request.getNodeLabelExpression());
- anyResourcesUpdated = true;
+ anyResourcesUpdated = true;
- // Update pendingResources
- updatePendingResources(lastRequest, request, schedulerKey,
- queue.getMetrics());
+ // Update pendingResources
+ updatePendingResources(lastRequest, request, schedulerKey,
+ queue.getMetrics());
+ }
}
+ return anyResourcesUpdated;
+ } finally {
+ this.writeLock.unlock();
}
- return anyResourcesUpdated;
}
private void updatePendingResources(ResourceRequest lastRequest,
@@ -529,34 +573,49 @@ public class AppSchedulingInfo {
return userBlacklistChanged.getAndSet(false);
}
- public synchronized Collection<SchedulerRequestKey> getSchedulerKeys() {
+ public Collection<SchedulerRequestKey> getSchedulerKeys() {
return schedulerKeys.keySet();
}
- public synchronized Map<String, ResourceRequest> getResourceRequests(
+ public Map<String, ResourceRequest> getResourceRequests(
SchedulerRequestKey schedulerKey) {
return resourceRequestMap.get(schedulerKey);
}
- public synchronized List<ResourceRequest> getAllResourceRequests() {
+ public List<ResourceRequest> getAllResourceRequests() {
List<ResourceRequest> ret = new ArrayList<>();
- for (Map<String, ResourceRequest> r : resourceRequestMap.values()) {
- ret.addAll(r.values());
+ try {
+ this.readLock.lock();
+ for (Map<String, ResourceRequest> r : resourceRequestMap.values()) {
+ ret.addAll(r.values());
+ }
+ } finally {
+ this.readLock.unlock();
}
return ret;
}
- public synchronized ResourceRequest getResourceRequest(
- SchedulerRequestKey schedulerKey, String resourceName) {
- Map<String, ResourceRequest> nodeRequests =
- resourceRequestMap.get(schedulerKey);
- return (nodeRequests == null) ? null : nodeRequests.get(resourceName);
+ public ResourceRequest getResourceRequest(SchedulerRequestKey schedulerKey,
+ String resourceName) {
+ try {
+ this.readLock.lock();
+ Map<String, ResourceRequest> nodeRequests =
+ resourceRequestMap.get(schedulerKey);
+ return (nodeRequests == null) ? null : nodeRequests.get(resourceName);
+ } finally {
+ this.readLock.unlock();
+ }
}
- public synchronized Resource getResource(SchedulerRequestKey schedulerKey) {
- ResourceRequest request =
- getResourceRequest(schedulerKey, ResourceRequest.ANY);
- return (request == null) ? null : request.getCapability();
+ public Resource getResource(SchedulerRequestKey schedulerKey) {
+ try {
+ this.readLock.lock();
+ ResourceRequest request =
+ getResourceRequest(schedulerKey, ResourceRequest.ANY);
+ return (request == null) ? null : request.getCapability();
+ } finally {
+ this.readLock.unlock();
+ }
}
/**
@@ -582,8 +641,7 @@ public class AppSchedulingInfo {
}
}
- public synchronized void increaseContainer(
- SchedContainerChangeRequest increaseRequest) {
+ public void increaseContainer(SchedContainerChangeRequest increaseRequest) {
NodeId nodeId = increaseRequest.getNodeId();
SchedulerRequestKey schedulerKey =
increaseRequest.getRMContainer().getAllocatedSchedulerKey();
@@ -596,16 +654,21 @@ public class AppSchedulingInfo {
+ increaseRequest.getNodeId() + " user=" + user + " resource="
+ deltaCapacity);
}
- // Set queue metrics
- queue.getMetrics().allocateResources(user, deltaCapacity);
- // remove the increase request from pending increase request map
- removeIncreaseRequest(nodeId, schedulerKey, containerId);
- // update usage
- appResourceUsage.incUsed(increaseRequest.getNodePartition(), deltaCapacity);
+ try {
+ this.writeLock.lock();
+ // Set queue metrics
+ queue.getMetrics().allocateResources(user, deltaCapacity);
+ // remove the increase request from pending increase request map
+ removeIncreaseRequest(nodeId, schedulerKey, containerId);
+ // update usage
+ appResourceUsage.incUsed(increaseRequest.getNodePartition(),
+ deltaCapacity);
+ } finally {
+ this.writeLock.unlock();
+ }
}
- public synchronized void decreaseContainer(
- SchedContainerChangeRequest decreaseRequest) {
+ public void decreaseContainer(SchedContainerChangeRequest decreaseRequest) {
// Delta is negative when it's a decrease request
Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
@@ -615,12 +678,17 @@ public class AppSchedulingInfo {
+ decreaseRequest.getNodeId() + " user=" + user + " resource="
+ absDelta);
}
-
- // Set queue metrics
- queue.getMetrics().releaseResources(user, absDelta);
- // update usage
- appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta);
+ try {
+ this.writeLock.lock();
+ // Set queue metrics
+ queue.getMetrics().releaseResources(user, absDelta);
+
+ // update usage
+ appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta);
+ } finally {
+ this.writeLock.unlock();
+ }
}
/**
@@ -633,43 +701,48 @@ public class AppSchedulingInfo {
* @param containerAllocated Container Allocated
* @return List of ResourceRequests
*/
- public synchronized List<ResourceRequest> allocate(NodeType type,
- SchedulerNode node, SchedulerRequestKey schedulerKey,
- ResourceRequest request, Container containerAllocated) {
+ public List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
+ SchedulerRequestKey schedulerKey, ResourceRequest request,
+ Container containerAllocated) {
List<ResourceRequest> resourceRequests = new ArrayList<>();
- if (type == NodeType.NODE_LOCAL) {
- allocateNodeLocal(node, schedulerKey, request, resourceRequests);
- } else if (type == NodeType.RACK_LOCAL) {
- allocateRackLocal(node, schedulerKey, request, resourceRequests);
- } else {
- allocateOffSwitch(request, resourceRequests, schedulerKey);
- }
- QueueMetrics metrics = queue.getMetrics();
- if (pending) {
- // once an allocation is done we assume the application is
- // running from scheduler's POV.
- pending = false;
- metrics.runAppAttempt(applicationId, user);
- }
+ try {
+ this.writeLock.lock();
+ if (type == NodeType.NODE_LOCAL) {
+ allocateNodeLocal(node, schedulerKey, request, resourceRequests);
+ } else if (type == NodeType.RACK_LOCAL) {
+ allocateRackLocal(node, schedulerKey, request, resourceRequests);
+ } else {
+ allocateOffSwitch(request, resourceRequests, schedulerKey);
+ }
+ QueueMetrics metrics = queue.getMetrics();
+ if (pending) {
+ // once an allocation is done we assume the application is
+ // running from scheduler's POV.
+ pending = false;
+ metrics.runAppAttempt(applicationId, user);
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("allocate: applicationId=" + applicationId
- + " container=" + containerAllocated.getId()
- + " host=" + containerAllocated.getNodeId().toString()
- + " user=" + user
- + " resource=" + request.getCapability()
- + " type=" + type);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("allocate: applicationId=" + applicationId
+ + " container=" + containerAllocated.getId()
+ + " host=" + containerAllocated.getNodeId().toString()
+ + " user=" + user
+ + " resource=" + request.getCapability()
+ + " type=" + type);
+ }
+ metrics.allocateResources(user, 1, request.getCapability(), true);
+ metrics.incrNodeTypeAggregations(user, type);
+ return resourceRequests;
+ } finally {
+ this.writeLock.unlock();
}
- metrics.allocateResources(user, 1, request.getCapability(), true);
- metrics.incrNodeTypeAggregations(user, type);
- return resourceRequests;
}
/**
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
*/
- private synchronized void allocateNodeLocal(SchedulerNode node,
+ private void allocateNodeLocal(SchedulerNode node,
SchedulerRequestKey schedulerKey, ResourceRequest nodeLocalRequest,
List<ResourceRequest> resourceRequests) {
// Update future requirements
@@ -701,7 +774,7 @@ public class AppSchedulingInfo {
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
*/
- private synchronized void allocateRackLocal(SchedulerNode node,
+ private void allocateRackLocal(SchedulerNode node,
SchedulerRequestKey schedulerKey, ResourceRequest rackLocalRequest,
List<ResourceRequest> resourceRequests) {
// Update future requirements
@@ -720,8 +793,8 @@ public class AppSchedulingInfo {
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
*/
- private synchronized void allocateOffSwitch(
- ResourceRequest offSwitchRequest, List<ResourceRequest> resourceRequests,
+ private void allocateOffSwitch(ResourceRequest offSwitchRequest,
+ List<ResourceRequest> resourceRequests,
SchedulerRequestKey schedulerKey) {
// Update future requirements
decrementOutstanding(offSwitchRequest, schedulerKey);
@@ -729,8 +802,8 @@ public class AppSchedulingInfo {
resourceRequests.add(cloneResourceRequest(offSwitchRequest));
}
- private synchronized void decrementOutstanding(
- ResourceRequest offSwitchRequest, SchedulerRequestKey schedulerKey) {
+ private void decrementOutstanding(ResourceRequest offSwitchRequest,
+ SchedulerRequestKey schedulerKey) {
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
// Do not remove ANY
@@ -748,66 +821,81 @@ public class AppSchedulingInfo {
queue.decPendingResource(offSwitchRequest.getNodeLabelExpression(),
offSwitchRequest.getCapability());
}
-
- private synchronized void checkForDeactivation() {
+
+ private void checkForDeactivation() {
if (schedulerKeys.isEmpty()) {
activeUsersManager.deactivateApplication(user, applicationId);
}
}
- public synchronized void move(Queue newQueue) {
- QueueMetrics oldMetrics = queue.getMetrics();
- QueueMetrics newMetrics = newQueue.getMetrics();
- for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
- ResourceRequest request = asks.get(ResourceRequest.ANY);
- if (request != null) {
- oldMetrics.decrPendingResources(user, request.getNumContainers(),
- request.getCapability());
- newMetrics.incrPendingResources(user, request.getNumContainers(),
- request.getCapability());
-
- Resource delta = Resources.multiply(request.getCapability(),
- request.getNumContainers());
- // Update Queue
- queue.decPendingResource(request.getNodeLabelExpression(), delta);
- newQueue.incPendingResource(request.getNodeLabelExpression(), delta);
+ public void move(Queue newQueue) {
+ try {
+ this.writeLock.lock();
+ QueueMetrics oldMetrics = queue.getMetrics();
+ QueueMetrics newMetrics = newQueue.getMetrics();
+ for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
+ ResourceRequest request = asks.get(ResourceRequest.ANY);
+ if (request != null) {
+ oldMetrics.decrPendingResources(user, request.getNumContainers(),
+ request.getCapability());
+ newMetrics.incrPendingResources(user, request.getNumContainers(),
+ request.getCapability());
+
+ Resource delta = Resources.multiply(request.getCapability(),
+ request.getNumContainers());
+ // Update Queue
+ queue.decPendingResource(request.getNodeLabelExpression(), delta);
+ newQueue.incPendingResource(request.getNodeLabelExpression(), delta);
+ }
}
+ oldMetrics.moveAppFrom(this);
+ newMetrics.moveAppTo(this);
+ activeUsersManager.deactivateApplication(user, applicationId);
+ activeUsersManager = newQueue.getActiveUsersManager();
+ activeUsersManager.activateApplication(user, applicationId);
+ this.queue = newQueue;
+ } finally {
+ this.writeLock.unlock();
}
- oldMetrics.moveAppFrom(this);
- newMetrics.moveAppTo(this);
- activeUsersManager.deactivateApplication(user, applicationId);
- activeUsersManager = newQueue.getActiveUsersManager();
- activeUsersManager.activateApplication(user, applicationId);
- this.queue = newQueue;
}
- public synchronized void stop() {
+ public void stop() {
// clear pending resources metrics for the application
- QueueMetrics metrics = queue.getMetrics();
- for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
- ResourceRequest request = asks.get(ResourceRequest.ANY);
- if (request != null) {
- metrics.decrPendingResources(user, request.getNumContainers(),
- request.getCapability());
-
- // Update Queue
- queue.decPendingResource(
- request.getNodeLabelExpression(),
- Resources.multiply(request.getCapability(),
- request.getNumContainers()));
+ try {
+ this.writeLock.lock();
+ QueueMetrics metrics = queue.getMetrics();
+ for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
+ ResourceRequest request = asks.get(ResourceRequest.ANY);
+ if (request != null) {
+ metrics.decrPendingResources(user, request.getNumContainers(),
+ request.getCapability());
+
+ // Update Queue
+ queue.decPendingResource(
+ request.getNodeLabelExpression(),
+ Resources.multiply(request.getCapability(),
+ request.getNumContainers()));
+ }
}
+ metrics.finishAppAttempt(applicationId, pending, user);
+
+ // Clear requests themselves
+ clearRequests();
+ } finally {
+ this.writeLock.unlock();
}
- metrics.finishAppAttempt(applicationId, pending, user);
-
- // Clear requests themselves
- clearRequests();
}
- public synchronized void setQueue(Queue queue) {
- this.queue = queue;
+ public void setQueue(Queue queue) {
+ try {
+ this.writeLock.lock();
+ this.queue = queue;
+ } finally {
+ this.writeLock.unlock();
+ }
}
- public Set<String> getBlackList() {
+ private Set<String> getBlackList() {
return this.placesBlacklistedByApp;
}
@@ -817,31 +905,36 @@ public class AppSchedulingInfo {
}
}
- public synchronized void transferStateFromPreviousAppSchedulingInfo(
+ public void transferStateFromPreviousAppSchedulingInfo(
AppSchedulingInfo appInfo) {
- // This should not require locking the userBlacklist since it will not be
- // used by this instance until after setCurrentAppAttempt.
+ // This should not require locking the placesBlacklistedByApp since it will
+ // not be used by this instance until after setCurrentAppAttempt.
this.placesBlacklistedByApp = appInfo.getBlackList();
}
- public synchronized void recoverContainer(RMContainer rmContainer) {
- QueueMetrics metrics = queue.getMetrics();
- if (pending) {
- // If there was any container to recover, the application was
- // running from scheduler's POV.
- pending = false;
- metrics.runAppAttempt(applicationId, user);
- }
+ public void recoverContainer(RMContainer rmContainer) {
+ try {
+ this.writeLock.lock();
+ QueueMetrics metrics = queue.getMetrics();
+ if (pending) {
+ // If there was any container to recover, the application was
+ // running from scheduler's POV.
+ pending = false;
+ metrics.runAppAttempt(applicationId, user);
+ }
- // Container is completed. Skip recovering resources.
- if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
- return;
- }
+ // Container is completed. Skip recovering resources.
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
- metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
- false);
+ metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
+ false);
+ } finally {
+ this.writeLock.unlock();
+ }
}
-
+
public ResourceRequest cloneResourceRequest(ResourceRequest request) {
ResourceRequest newRequest =
ResourceRequest.newInstance(request.getPriority(),
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org