You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2016/10/03 22:58:24 UTC

[11/57] [abbrv] hadoop git commit: YARN-3142. Improve locks in AppSchedulingInfo. (Varun Saxena via wangda)

YARN-3142. Improve locks in AppSchedulingInfo. (Varun Saxena via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1831be8e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1831be8e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1831be8e

Branch: refs/heads/HDFS-10467
Commit: 1831be8e737fd423a9f3d590767b944147e85641
Parents: 875062b
Author: Wangda Tan <wa...@apache.org>
Authored: Tue Sep 27 11:54:55 2016 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Tue Sep 27 11:54:55 2016 -0700

----------------------------------------------------------------------
 .../scheduler/AppSchedulingInfo.java            | 619 +++++++++++--------
 1 file changed, 356 insertions(+), 263 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1831be8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 39820f7..59a6650 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -67,7 +68,8 @@ public class AppSchedulingInfo {
 
   private Queue queue;
   private ActiveUsersManager activeUsersManager;
-  private boolean pending = true; // whether accepted/allocated by scheduler
+  // whether accepted/allocated by scheduler
+  private volatile boolean pending = true;
   private ResourceUsage appResourceUsage;
 
   private AtomicBoolean userBlacklistChanged = new AtomicBoolean(false);
@@ -86,6 +88,9 @@ public class AppSchedulingInfo {
       SchedContainerChangeRequest>>> containerIncreaseRequestMap =
       new ConcurrentHashMap<>();
 
+  private final ReentrantReadWriteLock.ReadLock readLock;
+  private final ReentrantReadWriteLock.WriteLock writeLock;
+
   public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
       String user, Queue queue, ActiveUsersManager activeUsersManager,
       long epoch, ResourceUsage appResourceUsage) {
@@ -97,6 +102,10 @@ public class AppSchedulingInfo {
     this.containerIdCounter = new AtomicLong(
         epoch << ResourceManager.EPOCH_BIT_SHIFT);
     this.appResourceUsage = appResourceUsage;
+
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
   }
 
   public ApplicationId getApplicationId() {
@@ -115,14 +124,19 @@ public class AppSchedulingInfo {
     return this.containerIdCounter.incrementAndGet();
   }
 
-  public synchronized String getQueueName() {
-    return queue.getQueueName();
+  public String getQueueName() {
+    try {
+      this.readLock.lock();
+      return queue.getQueueName();
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
-  public synchronized boolean isPending() {
+  public boolean isPending() {
     return pending;
   }
-  
+
   public Set<String> getRequestedPartitions() {
     return requestedPartitions;
   }
@@ -130,88 +144,103 @@ public class AppSchedulingInfo {
   /**
    * Clear any pending requests from this application.
    */
-  private synchronized void clearRequests() {
+  private void clearRequests() {
     schedulerKeys.clear();
     resourceRequestMap.clear();
     LOG.info("Application " + applicationId + " requests cleared");
   }
 
-  public synchronized boolean hasIncreaseRequest(NodeId nodeId) {
-    Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
-        requestsOnNode = containerIncreaseRequestMap.get(nodeId);
-    return requestsOnNode == null ? false : requestsOnNode.size() > 0;
+  public boolean hasIncreaseRequest(NodeId nodeId) {
+    try {
+      this.readLock.lock();
+      Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
+          requestsOnNode = containerIncreaseRequestMap.get(nodeId);
+      return requestsOnNode == null ? false : requestsOnNode.size() > 0;
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
-  public synchronized Map<ContainerId, SchedContainerChangeRequest>
+  public Map<ContainerId, SchedContainerChangeRequest>
       getIncreaseRequests(NodeId nodeId, SchedulerRequestKey schedulerKey) {
-    Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
-        requestsOnNode = containerIncreaseRequestMap.get(nodeId);
-    return requestsOnNode == null ? null : requestsOnNode.get(
-        schedulerKey);
+    try {
+      this.readLock.lock();
+      Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
+          requestsOnNode = containerIncreaseRequestMap.get(nodeId);
+      return requestsOnNode == null ? null : requestsOnNode.get(
+          schedulerKey);
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   /**
    * return true if any of the existing increase requests are updated,
    *        false if none of them are updated
    */
-  public synchronized boolean updateIncreaseRequests(
+  public boolean updateIncreaseRequests(
       List<SchedContainerChangeRequest> increaseRequests) {
     boolean resourceUpdated = false;
 
-    for (SchedContainerChangeRequest r : increaseRequests) {
-      if (r.getRMContainer().getState() != RMContainerState.RUNNING) {
-        LOG.warn("rmContainer's state is not RUNNING, for increase request with"
-            + " container-id=" + r.getContainerId());
-        continue;
-      }
-      try {
-        RMServerUtils.checkSchedContainerChangeRequest(r, true);
-      } catch (YarnException e) {
-        LOG.warn("Error happens when checking increase request, Ignoring.."
-            + " exception=", e);
-        continue;
-      }
-      NodeId nodeId = r.getRMContainer().getAllocatedNode();
-
-      Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
-          requestsOnNode = containerIncreaseRequestMap.get(nodeId);
-      if (null == requestsOnNode) {
-        requestsOnNode = new TreeMap<>();
-        containerIncreaseRequestMap.put(nodeId, requestsOnNode);
-      }
-
-      SchedContainerChangeRequest prevChangeRequest =
-          getIncreaseRequest(nodeId,
-              r.getRMContainer().getAllocatedSchedulerKey(),
-              r.getContainerId());
-      if (null != prevChangeRequest) {
-        if (Resources.equals(prevChangeRequest.getTargetCapacity(),
-            r.getTargetCapacity())) {
-          // increase request hasn't changed
+    try {
+      this.writeLock.lock();
+      for (SchedContainerChangeRequest r : increaseRequests) {
+        if (r.getRMContainer().getState() != RMContainerState.RUNNING) {
+          LOG.warn("rmContainer's state is not RUNNING, for increase request"
+              + " with container-id=" + r.getContainerId());
           continue;
         }
+        try {
+          RMServerUtils.checkSchedContainerChangeRequest(r, true);
+        } catch (YarnException e) {
+          LOG.warn("Error happens when checking increase request, Ignoring.."
+              + " exception=", e);
+          continue;
+        }
+        NodeId nodeId = r.getRMContainer().getAllocatedNode();
 
-        // remove the old one, as we will use the new one going forward
-        removeIncreaseRequest(nodeId,
-            prevChangeRequest.getRMContainer().getAllocatedSchedulerKey(),
-            prevChangeRequest.getContainerId());
-      }
+        Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
+            requestsOnNode = containerIncreaseRequestMap.get(nodeId);
+        if (null == requestsOnNode) {
+          requestsOnNode = new TreeMap<>();
+          containerIncreaseRequestMap.put(nodeId, requestsOnNode);
+        }
 
-      if (Resources.equals(r.getTargetCapacity(),
-          r.getRMContainer().getAllocatedResource())) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Trying to increase container " + r.getContainerId()
-              + ", target capacity = previous capacity = " + prevChangeRequest
-              + ". Will ignore this increase request.");
+        SchedContainerChangeRequest prevChangeRequest =
+            getIncreaseRequest(nodeId,
+                r.getRMContainer().getAllocatedSchedulerKey(),
+                r.getContainerId());
+        if (null != prevChangeRequest) {
+          if (Resources.equals(prevChangeRequest.getTargetCapacity(),
+              r.getTargetCapacity())) {
+            // increase request hasn't changed
+            continue;
+          }
+
+          // remove the old one, as we will use the new one going forward
+          removeIncreaseRequest(nodeId,
+              prevChangeRequest.getRMContainer().getAllocatedSchedulerKey(),
+              prevChangeRequest.getContainerId());
         }
-        continue;
-      }
 
-      // add the new one
-      resourceUpdated = true;
-      insertIncreaseRequest(r);
+        if (Resources.equals(r.getTargetCapacity(),
+            r.getRMContainer().getAllocatedResource())) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Trying to increase container " + r.getContainerId()
+                + ", target capacity = previous capacity = " + prevChangeRequest
+                + ". Will ignore this increase request.");
+          }
+          continue;
+        }
+
+        // add the new one
+        resourceUpdated = true;
+        insertIncreaseRequest(r);
+      }
+      return resourceUpdated;
+    } finally {
+      this.writeLock.unlock();
     }
-    return resourceUpdated;
   }
 
   /**
@@ -275,61 +304,71 @@ public class AppSchedulingInfo {
     }
   }
 
-  public synchronized boolean removeIncreaseRequest(NodeId nodeId,
+  public boolean removeIncreaseRequest(NodeId nodeId,
       SchedulerRequestKey schedulerKey, ContainerId containerId) {
-    Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
-        requestsOnNode = containerIncreaseRequestMap.get(nodeId);
-    if (null == requestsOnNode) {
-      return false;
-    }
+    try {
+      this.writeLock.lock();
+      Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
+          requestsOnNode = containerIncreaseRequestMap.get(nodeId);
+      if (null == requestsOnNode) {
+        return false;
+      }
 
-    Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
-        requestsOnNode.get(schedulerKey);
-    if (null == requestsOnNodeWithPriority) {
-      return false;
-    }
+      Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
+          requestsOnNode.get(schedulerKey);
+      if (null == requestsOnNodeWithPriority) {
+        return false;
+      }
 
-    SchedContainerChangeRequest request =
-        requestsOnNodeWithPriority.remove(containerId);
-    
-    // remove hierarchies if it becomes empty
-    if (requestsOnNodeWithPriority.isEmpty()) {
-      requestsOnNode.remove(schedulerKey);
-      decrementSchedulerKeyReference(schedulerKey);
-    }
-    if (requestsOnNode.isEmpty()) {
-      containerIncreaseRequestMap.remove(nodeId);
-    }
+      SchedContainerChangeRequest request =
+          requestsOnNodeWithPriority.remove(containerId);
     
-    if (request == null) {
-      return false;
-    }
+      // remove hierarchies if it becomes empty
+      if (requestsOnNodeWithPriority.isEmpty()) {
+        requestsOnNode.remove(schedulerKey);
+        decrementSchedulerKeyReference(schedulerKey);
+      }
+      if (requestsOnNode.isEmpty()) {
+        containerIncreaseRequestMap.remove(nodeId);
+      }
 
-    // update queue's pending resource if request exists
-    String partition = request.getRMContainer().getNodeLabelExpression();
-    Resource delta = request.getDeltaCapacity();
-    appResourceUsage.decPending(partition, delta);
-    queue.decPendingResource(partition, delta);
-    
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("remove increase request:" + request);
+      if (request == null) {
+        return false;
+      }
+
+      // update queue's pending resource if request exists
+      String partition = request.getRMContainer().getNodeLabelExpression();
+      Resource delta = request.getDeltaCapacity();
+      appResourceUsage.decPending(partition, delta);
+      queue.decPendingResource(partition, delta);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("remove increase request:" + request);
+      }
+
+      return true;
+    } finally {
+      this.writeLock.unlock();
     }
-    
-    return true;
   }
   
   public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
       SchedulerRequestKey schedulerKey, ContainerId containerId) {
-    Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
-        requestsOnNode = containerIncreaseRequestMap.get(nodeId);
-    if (null == requestsOnNode) {
-      return null;
-    }
+    try {
+      this.readLock.lock();
+      Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
+          requestsOnNode = containerIncreaseRequestMap.get(nodeId);
+      if (null == requestsOnNode) {
+        return null;
+      }
 
-    Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
-        requestsOnNode.get(schedulerKey);
-    return requestsOnNodeWithPriority == null ? null
-        : requestsOnNodeWithPriority.get(containerId);
+      Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
+          requestsOnNode.get(schedulerKey);
+      return requestsOnNodeWithPriority == null ? null
+          : requestsOnNodeWithPriority.get(containerId);
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   /**
@@ -343,49 +382,54 @@ public class AppSchedulingInfo {
    *          recover ResourceRequest on preemption
    * @return true if any resource was updated, false otherwise
    */
-  public synchronized boolean updateResourceRequests(
-      List<ResourceRequest> requests,
+  public boolean updateResourceRequests(List<ResourceRequest> requests,
       boolean recoverPreemptedRequestForAContainer) {
     // Flag to track if any incoming requests update "ANY" requests
     boolean anyResourcesUpdated = false;
 
-    // Update resource requests
-    for (ResourceRequest request : requests) {
-      SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
-      String resourceName = request.getResourceName();
-
-      // Update node labels if required
-      updateNodeLabels(request);
-
-      Map<String, ResourceRequest> asks =
-          this.resourceRequestMap.get(schedulerKey);
-      if (asks == null) {
-        asks = new ConcurrentHashMap<>();
-        this.resourceRequestMap.put(schedulerKey, asks);
-      }
+    try {
+      this.writeLock.lock();
+      // Update resource requests
+      for (ResourceRequest request : requests) {
+        SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
+        String resourceName = request.getResourceName();
+
+        // Update node labels if required
+        updateNodeLabels(request);
+
+        Map<String, ResourceRequest> asks =
+            this.resourceRequestMap.get(schedulerKey);
+        if (asks == null) {
+          asks = new ConcurrentHashMap<>();
+          this.resourceRequestMap.put(schedulerKey, asks);
+        }
 
-      // Increment number of containers if recovering preempted resources
-      ResourceRequest lastRequest = asks.get(resourceName);
-      if (recoverPreemptedRequestForAContainer && lastRequest != null) {
-        request.setNumContainers(lastRequest.getNumContainers() + 1);
-      }
+        // Increment number of containers if recovering preempted resources
+        ResourceRequest lastRequest = asks.get(resourceName);
+        if (recoverPreemptedRequestForAContainer && lastRequest != null) {
+          request.setNumContainers(lastRequest.getNumContainers() + 1);
+        }
 
-      // Update asks
-      asks.put(resourceName, request);
+        // Update asks
+        asks.put(resourceName, request);
 
-      if (resourceName.equals(ResourceRequest.ANY)) {
-        //update the applications requested labels set
-        requestedPartitions.add(request.getNodeLabelExpression() == null
-            ? RMNodeLabelsManager.NO_LABEL : request.getNodeLabelExpression());
+        if (resourceName.equals(ResourceRequest.ANY)) {
+          //update the applications requested labels set
+          requestedPartitions.add(request.getNodeLabelExpression() == null
+              ? RMNodeLabelsManager.NO_LABEL :
+                  request.getNodeLabelExpression());
 
-        anyResourcesUpdated = true;
+          anyResourcesUpdated = true;
 
-        // Update pendingResources
-        updatePendingResources(lastRequest, request, schedulerKey,
-            queue.getMetrics());
+          // Update pendingResources
+          updatePendingResources(lastRequest, request, schedulerKey,
+              queue.getMetrics());
+        }
       }
+      return anyResourcesUpdated;
+    } finally {
+      this.writeLock.unlock();
     }
-    return anyResourcesUpdated;
   }
 
   private void updatePendingResources(ResourceRequest lastRequest,
@@ -529,34 +573,49 @@ public class AppSchedulingInfo {
     return userBlacklistChanged.getAndSet(false);
   }
 
-  public synchronized Collection<SchedulerRequestKey> getSchedulerKeys() {
+  public Collection<SchedulerRequestKey> getSchedulerKeys() {
     return schedulerKeys.keySet();
   }
 
-  public synchronized Map<String, ResourceRequest> getResourceRequests(
+  public Map<String, ResourceRequest> getResourceRequests(
       SchedulerRequestKey schedulerKey) {
     return resourceRequestMap.get(schedulerKey);
   }
 
-  public synchronized List<ResourceRequest> getAllResourceRequests() {
+  public List<ResourceRequest> getAllResourceRequests() {
     List<ResourceRequest> ret = new ArrayList<>();
-    for (Map<String, ResourceRequest> r : resourceRequestMap.values()) {
-      ret.addAll(r.values());
+    try {
+      this.readLock.lock();
+      for (Map<String, ResourceRequest> r : resourceRequestMap.values()) {
+        ret.addAll(r.values());
+      }
+    } finally {
+      this.readLock.unlock();
     }
     return ret;
   }
 
-  public synchronized ResourceRequest getResourceRequest(
-      SchedulerRequestKey schedulerKey, String resourceName) {
-    Map<String, ResourceRequest> nodeRequests =
-        resourceRequestMap.get(schedulerKey);
-    return (nodeRequests == null) ? null : nodeRequests.get(resourceName);
+  public ResourceRequest getResourceRequest(SchedulerRequestKey schedulerKey,
+      String resourceName) {
+    try {
+      this.readLock.lock();
+      Map<String, ResourceRequest> nodeRequests =
+          resourceRequestMap.get(schedulerKey);
+      return (nodeRequests == null) ? null : nodeRequests.get(resourceName);
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
-  public synchronized Resource getResource(SchedulerRequestKey schedulerKey) {
-    ResourceRequest request =
-        getResourceRequest(schedulerKey, ResourceRequest.ANY);
-    return (request == null) ? null : request.getCapability();
+  public Resource getResource(SchedulerRequestKey schedulerKey) {
+    try {
+      this.readLock.lock();
+      ResourceRequest request =
+          getResourceRequest(schedulerKey, ResourceRequest.ANY);
+      return (request == null) ? null : request.getCapability();
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   /**
@@ -582,8 +641,7 @@ public class AppSchedulingInfo {
     }
   }
 
-  public synchronized void increaseContainer(
-      SchedContainerChangeRequest increaseRequest) {
+  public void increaseContainer(SchedContainerChangeRequest increaseRequest) {
     NodeId nodeId = increaseRequest.getNodeId();
     SchedulerRequestKey schedulerKey =
         increaseRequest.getRMContainer().getAllocatedSchedulerKey();
@@ -596,16 +654,21 @@ public class AppSchedulingInfo {
           + increaseRequest.getNodeId() + " user=" + user + " resource="
           + deltaCapacity);
     }
-    // Set queue metrics
-    queue.getMetrics().allocateResources(user, deltaCapacity);
-    // remove the increase request from pending increase request map
-    removeIncreaseRequest(nodeId, schedulerKey, containerId);
-    // update usage
-    appResourceUsage.incUsed(increaseRequest.getNodePartition(), deltaCapacity);
+    try {
+      this.writeLock.lock();
+      // Set queue metrics
+      queue.getMetrics().allocateResources(user, deltaCapacity);
+      // remove the increase request from pending increase request map
+      removeIncreaseRequest(nodeId, schedulerKey, containerId);
+      // update usage
+      appResourceUsage.incUsed(increaseRequest.getNodePartition(),
+          deltaCapacity);
+    } finally {
+      this.writeLock.unlock();
+    }
   }
   
-  public synchronized void decreaseContainer(
-      SchedContainerChangeRequest decreaseRequest) {
+  public void decreaseContainer(SchedContainerChangeRequest decreaseRequest) {
     // Delta is negative when it's a decrease request
     Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
 
@@ -615,12 +678,17 @@ public class AppSchedulingInfo {
           + decreaseRequest.getNodeId() + " user=" + user + " resource="
           + absDelta);
     }
-    
-    // Set queue metrics
-    queue.getMetrics().releaseResources(user, absDelta);
 
-    // update usage
-    appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta);
+    try {
+      this.writeLock.lock();
+      // Set queue metrics
+      queue.getMetrics().releaseResources(user, absDelta);
+
+      // update usage
+      appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta);
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
   /**
@@ -633,43 +701,48 @@ public class AppSchedulingInfo {
    * @param containerAllocated Container Allocated
    * @return List of ResourceRequests
    */
-  public synchronized List<ResourceRequest> allocate(NodeType type,
-      SchedulerNode node, SchedulerRequestKey schedulerKey,
-      ResourceRequest request, Container containerAllocated) {
+  public List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
+      SchedulerRequestKey schedulerKey, ResourceRequest request,
+      Container containerAllocated) {
     List<ResourceRequest> resourceRequests = new ArrayList<>();
-    if (type == NodeType.NODE_LOCAL) {
-      allocateNodeLocal(node, schedulerKey, request, resourceRequests);
-    } else if (type == NodeType.RACK_LOCAL) {
-      allocateRackLocal(node, schedulerKey, request, resourceRequests);
-    } else {
-      allocateOffSwitch(request, resourceRequests, schedulerKey);
-    }
-    QueueMetrics metrics = queue.getMetrics();
-    if (pending) {
-      // once an allocation is done we assume the application is
-      // running from scheduler's POV.
-      pending = false;
-      metrics.runAppAttempt(applicationId, user);
-    }
+    try {
+      this.writeLock.lock();
+      if (type == NodeType.NODE_LOCAL) {
+        allocateNodeLocal(node, schedulerKey, request, resourceRequests);
+      } else if (type == NodeType.RACK_LOCAL) {
+        allocateRackLocal(node, schedulerKey, request, resourceRequests);
+      } else {
+        allocateOffSwitch(request, resourceRequests, schedulerKey);
+      }
+      QueueMetrics metrics = queue.getMetrics();
+      if (pending) {
+        // once an allocation is done we assume the application is
+        // running from scheduler's POV.
+        pending = false;
+        metrics.runAppAttempt(applicationId, user);
+      }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("allocate: applicationId=" + applicationId
-          + " container=" + containerAllocated.getId()
-          + " host=" + containerAllocated.getNodeId().toString()
-          + " user=" + user
-          + " resource=" + request.getCapability()
-          + " type=" + type);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("allocate: applicationId=" + applicationId
+            + " container=" + containerAllocated.getId()
+            + " host=" + containerAllocated.getNodeId().toString()
+            + " user=" + user
+            + " resource=" + request.getCapability()
+            + " type=" + type);
+      }
+      metrics.allocateResources(user, 1, request.getCapability(), true);
+      metrics.incrNodeTypeAggregations(user, type);
+      return resourceRequests;
+    } finally {
+      this.writeLock.unlock();
     }
-    metrics.allocateResources(user, 1, request.getCapability(), true);
-    metrics.incrNodeTypeAggregations(user, type);
-    return resourceRequests;
   }
 
   /**
    * The {@link ResourceScheduler} is allocating data-local resources to the
    * application.
    */
-  private synchronized void allocateNodeLocal(SchedulerNode node,
+  private void allocateNodeLocal(SchedulerNode node,
       SchedulerRequestKey schedulerKey, ResourceRequest nodeLocalRequest,
       List<ResourceRequest> resourceRequests) {
     // Update future requirements
@@ -701,7 +774,7 @@ public class AppSchedulingInfo {
    * The {@link ResourceScheduler} is allocating data-local resources to the
    * application.
    */
-  private synchronized void allocateRackLocal(SchedulerNode node,
+  private void allocateRackLocal(SchedulerNode node,
       SchedulerRequestKey schedulerKey, ResourceRequest rackLocalRequest,
       List<ResourceRequest> resourceRequests) {
     // Update future requirements
@@ -720,8 +793,8 @@ public class AppSchedulingInfo {
    * The {@link ResourceScheduler} is allocating data-local resources to the
    * application.
    */
-  private synchronized void allocateOffSwitch(
-      ResourceRequest offSwitchRequest, List<ResourceRequest> resourceRequests,
+  private void allocateOffSwitch(ResourceRequest offSwitchRequest,
+      List<ResourceRequest> resourceRequests,
       SchedulerRequestKey schedulerKey) {
     // Update future requirements
     decrementOutstanding(offSwitchRequest, schedulerKey);
@@ -729,8 +802,8 @@ public class AppSchedulingInfo {
     resourceRequests.add(cloneResourceRequest(offSwitchRequest));
   }
 
-  private synchronized void decrementOutstanding(
-      ResourceRequest offSwitchRequest, SchedulerRequestKey schedulerKey) {
+  private void decrementOutstanding(ResourceRequest offSwitchRequest,
+      SchedulerRequestKey schedulerKey) {
     int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
 
     // Do not remove ANY
@@ -748,66 +821,81 @@ public class AppSchedulingInfo {
     queue.decPendingResource(offSwitchRequest.getNodeLabelExpression(),
         offSwitchRequest.getCapability());
   }
-  
-  private synchronized void checkForDeactivation() {
+
+  private void checkForDeactivation() {
     if (schedulerKeys.isEmpty()) {
       activeUsersManager.deactivateApplication(user, applicationId);
     }
   }
   
-  public synchronized void move(Queue newQueue) {
-    QueueMetrics oldMetrics = queue.getMetrics();
-    QueueMetrics newMetrics = newQueue.getMetrics();
-    for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
-      ResourceRequest request = asks.get(ResourceRequest.ANY);
-      if (request != null) {
-        oldMetrics.decrPendingResources(user, request.getNumContainers(),
-            request.getCapability());
-        newMetrics.incrPendingResources(user, request.getNumContainers(),
-            request.getCapability());
-        
-        Resource delta = Resources.multiply(request.getCapability(),
-            request.getNumContainers()); 
-        // Update Queue
-        queue.decPendingResource(request.getNodeLabelExpression(), delta);
-        newQueue.incPendingResource(request.getNodeLabelExpression(), delta);
+  public void move(Queue newQueue) {
+    try {
+      this.writeLock.lock();
+      QueueMetrics oldMetrics = queue.getMetrics();
+      QueueMetrics newMetrics = newQueue.getMetrics();
+      for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
+        ResourceRequest request = asks.get(ResourceRequest.ANY);
+        if (request != null) {
+          oldMetrics.decrPendingResources(user, request.getNumContainers(),
+              request.getCapability());
+          newMetrics.incrPendingResources(user, request.getNumContainers(),
+              request.getCapability());
+
+          Resource delta = Resources.multiply(request.getCapability(),
+              request.getNumContainers());
+          // Update Queue
+          queue.decPendingResource(request.getNodeLabelExpression(), delta);
+          newQueue.incPendingResource(request.getNodeLabelExpression(), delta);
+        }
       }
+      oldMetrics.moveAppFrom(this);
+      newMetrics.moveAppTo(this);
+      activeUsersManager.deactivateApplication(user, applicationId);
+      activeUsersManager = newQueue.getActiveUsersManager();
+      activeUsersManager.activateApplication(user, applicationId);
+      this.queue = newQueue;
+    } finally {
+      this.writeLock.unlock();
     }
-    oldMetrics.moveAppFrom(this);
-    newMetrics.moveAppTo(this);
-    activeUsersManager.deactivateApplication(user, applicationId);
-    activeUsersManager = newQueue.getActiveUsersManager();
-    activeUsersManager.activateApplication(user, applicationId);
-    this.queue = newQueue;
   }
 
-  public synchronized void stop() {
+  public void stop() {
     // clear pending resources metrics for the application
-    QueueMetrics metrics = queue.getMetrics();
-    for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
-      ResourceRequest request = asks.get(ResourceRequest.ANY);
-      if (request != null) {
-        metrics.decrPendingResources(user, request.getNumContainers(),
-            request.getCapability());
-        
-        // Update Queue
-        queue.decPendingResource(
-            request.getNodeLabelExpression(),
-            Resources.multiply(request.getCapability(),
-                request.getNumContainers()));
+    try {
+      this.writeLock.lock();
+      QueueMetrics metrics = queue.getMetrics();
+      for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
+        ResourceRequest request = asks.get(ResourceRequest.ANY);
+        if (request != null) {
+          metrics.decrPendingResources(user, request.getNumContainers(),
+              request.getCapability());
+
+          // Update Queue
+          queue.decPendingResource(
+              request.getNodeLabelExpression(),
+              Resources.multiply(request.getCapability(),
+                  request.getNumContainers()));
+        }
       }
+      metrics.finishAppAttempt(applicationId, pending, user);
+
+      // Clear requests themselves
+      clearRequests();
+    } finally {
+      this.writeLock.unlock();
     }
-    metrics.finishAppAttempt(applicationId, pending, user);
-    
-    // Clear requests themselves
-    clearRequests();
   }
 
-  public synchronized void setQueue(Queue queue) {
-    this.queue = queue;
+  public void setQueue(Queue queue) {
+    try {
+      this.writeLock.lock();
+      this.queue = queue;
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
-  public Set<String> getBlackList() {
+  private Set<String> getBlackList() {
     return this.placesBlacklistedByApp;
   }
 
@@ -817,31 +905,36 @@ public class AppSchedulingInfo {
     }
   }
 
-  public synchronized void transferStateFromPreviousAppSchedulingInfo(
+  public void transferStateFromPreviousAppSchedulingInfo(
       AppSchedulingInfo appInfo) {
-    // This should not require locking the userBlacklist since it will not be
-    // used by this instance until after setCurrentAppAttempt.
+    // This should not require locking the placesBlacklistedByApp since it will
+    // not be used by this instance until after setCurrentAppAttempt.
     this.placesBlacklistedByApp = appInfo.getBlackList();
   }
 
-  public synchronized void recoverContainer(RMContainer rmContainer) {
-    QueueMetrics metrics = queue.getMetrics();
-    if (pending) {
-      // If there was any container to recover, the application was
-      // running from scheduler's POV.
-      pending = false;
-      metrics.runAppAttempt(applicationId, user);
-    }
+  public void recoverContainer(RMContainer rmContainer) {
+    try {
+      this.writeLock.lock();
+      QueueMetrics metrics = queue.getMetrics();
+      if (pending) {
+        // If there was any container to recover, the application was
+        // running from scheduler's POV.
+        pending = false;
+        metrics.runAppAttempt(applicationId, user);
+      }
 
-    // Container is completed. Skip recovering resources.
-    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
-      return;
-    }
+      // Container is completed. Skip recovering resources.
+      if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+        return;
+      }
 
-    metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
-      false);
+      metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
+          false);
+    } finally {
+      this.writeLock.unlock();
+    }
   }
-  
+
   public ResourceRequest cloneResourceRequest(ResourceRequest request) {
     ResourceRequest newRequest =
         ResourceRequest.newInstance(request.getPriority(),


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org