You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dr...@apache.org on 2016/10/08 06:10:06 UTC

[10/50] [abbrv] hadoop git commit: YARN-3139. Improve locks in AbstractYarnScheduler/CapacityScheduler/FairScheduler. Contributed by Wangda Tan

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f8da22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 6129772..eecd4ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -2227,6 +2227,22 @@ public class LeafQueue extends AbstractCSQueue {
     }
   }
 
+  public void updateApplicationPriority(SchedulerApplication<FiCaSchedulerApp> app,
+      Priority newAppPriority) {
+    try {
+      writeLock.lock();
+      FiCaSchedulerApp attempt = app.getCurrentAppAttempt();
+      getOrderingPolicy().removeSchedulableEntity(attempt);
+
+      // Update new priority in SchedulerApplication
+      attempt.setPriority(newAppPriority);
+
+      getOrderingPolicy().addSchedulableEntity(attempt);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
   public OrderingPolicy<FiCaSchedulerApp>
       getPendingAppsOrderingPolicy() {
     return pendingOrderingPolicy;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f8da22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index fd43e74..aa7ad50 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -666,6 +667,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     } finally {
       writeLock.unlock();
     }
+  }
 
+  public ReentrantReadWriteLock.WriteLock getWriteLock() {
+    return this.writeLock;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f8da22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 920052f..8daf0f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -186,10 +186,13 @@ public class FairScheduler extends
                                       // an app can be reserved on
 
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
-  protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
-  protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
+  // Continuous Scheduling enabled or not
+  protected boolean continuousSchedulingEnabled;
+  // Sleep time for each pass in continuous scheduling
+  protected volatile int continuousSchedulingSleepMs;
+  // Node available resource comparator
   private Comparator<FSSchedulerNode> nodeAvailableResourceComparator =
-          new NodeAvailableResourceComparator(); // Node available resource comparator
+          new NodeAvailableResourceComparator();
   protected double nodeLocalityThreshold; // Cluster threshold for node locality
   protected double rackLocalityThreshold; // Cluster threshold for rack locality
   protected long nodeLocalityDelayMs; // Delay for node locality
@@ -338,36 +341,40 @@ public class FairScheduler extends
    * fair shares, deficits, minimum slot allocations, and amount of used and
    * required resources per job.
    */
-  protected synchronized void update() {
-    long start = getClock().getTime();
-    updateStarvationStats(); // Determine if any queues merit preemption
+  protected void update() {
+    try {
+      writeLock.lock();
+      long start = getClock().getTime();
+      updateStarvationStats(); // Determine if any queues merit preemption
 
-    FSQueue rootQueue = queueMgr.getRootQueue();
+      FSQueue rootQueue = queueMgr.getRootQueue();
 
-    // Recursively update demands for all queues
-    rootQueue.updateDemand();
+      // Recursively update demands for all queues
+      rootQueue.updateDemand();
 
-    Resource clusterResource = getClusterResource();
-    rootQueue.setFairShare(clusterResource);
-    // Recursively compute fair shares for all queues
-    // and update metrics
-    rootQueue.recomputeShares();
-    updateRootQueueMetrics();
+      Resource clusterResource = getClusterResource();
+      rootQueue.setFairShare(clusterResource);
+      // Recursively compute fair shares for all queues
+      // and update metrics
+      rootQueue.recomputeShares();
+      updateRootQueueMetrics();
 
-    if (LOG.isDebugEnabled()) {
-      if (--updatesToSkipForDebug < 0) {
-        updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
-        LOG.debug("Cluster Capacity: " + clusterResource +
-            "  Allocations: " + rootMetrics.getAllocatedResources() +
-            "  Availability: " + Resource.newInstance(
-            rootMetrics.getAvailableMB(),
-            rootMetrics.getAvailableVirtualCores()) +
-            "  Demand: " + rootQueue.getDemand());
+      if (LOG.isDebugEnabled()) {
+        if (--updatesToSkipForDebug < 0) {
+          updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
+          LOG.debug("Cluster Capacity: " + clusterResource + "  Allocations: "
+              + rootMetrics.getAllocatedResources() + "  Availability: "
+              + Resource.newInstance(rootMetrics.getAvailableMB(),
+              rootMetrics.getAvailableVirtualCores()) + "  Demand: " + rootQueue
+              .getDemand());
+        }
       }
-    }
 
-    long duration = getClock().getTime() - start;
-    fsOpDurations.addUpdateCallDuration(duration);
+      long duration = getClock().getTime() - start;
+      fsOpDurations.addUpdateCallDuration(duration);
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   /**
@@ -389,23 +396,28 @@ public class FairScheduler extends
    * such queues exist, compute how many tasks of each type need to be preempted
    * and then select the right ones using preemptTasks.
    */
-  protected synchronized void preemptTasksIfNecessary() {
-    if (!shouldAttemptPreemption()) {
-      return;
-    }
+  protected void preemptTasksIfNecessary() {
+    try {
+      writeLock.lock();
+      if (!shouldAttemptPreemption()) {
+        return;
+      }
 
-    long curTime = getClock().getTime();
-    if (curTime - lastPreemptCheckTime < preemptionInterval) {
-      return;
-    }
-    lastPreemptCheckTime = curTime;
+      long curTime = getClock().getTime();
+      if (curTime - lastPreemptCheckTime < preemptionInterval) {
+        return;
+      }
+      lastPreemptCheckTime = curTime;
 
-    Resource resToPreempt = Resources.clone(Resources.none());
-    for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
-      Resources.addTo(resToPreempt, resourceDeficit(sched, curTime));
-    }
-    if (isResourceGreaterThanNone(resToPreempt)) {
-      preemptResources(resToPreempt);
+      Resource resToPreempt = Resources.clone(Resources.none());
+      for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
+        Resources.addTo(resToPreempt, resourceDeficit(sched, curTime));
+      }
+      if (isResourceGreaterThanNone(resToPreempt)) {
+        preemptResources(resToPreempt);
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -549,22 +561,27 @@ public class FairScheduler extends
     return deficit;
   }
 
-  public synchronized RMContainerTokenSecretManager
+  public RMContainerTokenSecretManager
       getContainerTokenSecretManager() {
     return rmContext.getContainerTokenSecretManager();
   }
 
-  // synchronized for sizeBasedWeight
-  public synchronized ResourceWeights getAppWeight(FSAppAttempt app) {
-    double weight = 1.0;
-    if (sizeBasedWeight) {
-      // Set weight based on current memory demand
-      weight = Math.log1p(app.getDemand().getMemorySize()) / Math.log(2);
+  public ResourceWeights getAppWeight(FSAppAttempt app) {
+    try {
+      readLock.lock();
+      double weight = 1.0;
+      if (sizeBasedWeight) {
+        // Set weight based on current memory demand
+        weight = Math.log1p(app.getDemand().getMemorySize()) / Math.log(2);
+      }
+      weight *= app.getPriority().getPriority();
+      ResourceWeights resourceWeights = app.getResourceWeights();
+      resourceWeights.setWeight((float) weight);
+      return resourceWeights;
+    } finally {
+      readLock.unlock();
     }
-    weight *= app.getPriority().getPriority();
-    ResourceWeights resourceWeights = app.getResourceWeights();
-    resourceWeights.setWeight((float)weight);
-    return resourceWeights;
+
   }
 
   public Resource getIncrementResourceCapability() {
@@ -595,7 +612,7 @@ public class FairScheduler extends
     return continuousSchedulingEnabled;
   }
 
-  public synchronized int getContinuousSchedulingSleepMs() {
+  public int getContinuousSchedulingSleepMs() {
     return continuousSchedulingSleepMs;
   }
 
@@ -617,114 +634,123 @@ public class FairScheduler extends
    * user. This will accept a new app even if the user or queue is above
    * configured limits, but the app will not be marked as runnable.
    */
-  protected synchronized void addApplication(ApplicationId applicationId,
+  protected void addApplication(ApplicationId applicationId,
       String queueName, String user, boolean isAppRecovering) {
     if (queueName == null || queueName.isEmpty()) {
-      String message = "Reject application " + applicationId +
-              " submitted by user " + user + " with an empty queue name.";
+      String message =
+          "Reject application " + applicationId + " submitted by user " + user
+              + " with an empty queue name.";
       LOG.info(message);
-      rmContext.getDispatcher().getEventHandler()
-          .handle(new RMAppEvent(applicationId,
-              RMAppEventType.APP_REJECTED, message));
+      rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+              message));
       return;
     }
 
     if (queueName.startsWith(".") || queueName.endsWith(".")) {
-      String message = "Reject application " + applicationId
-          + " submitted by user " + user + " with an illegal queue name "
-          + queueName + ". "
-          + "The queue name cannot start/end with period.";
+      String message =
+          "Reject application " + applicationId + " submitted by user " + user
+              + " with an illegal queue name " + queueName + ". "
+              + "The queue name cannot start/end with period.";
       LOG.info(message);
-      rmContext.getDispatcher().getEventHandler()
-          .handle(new RMAppEvent(applicationId,
-              RMAppEventType.APP_REJECTED, message));
+      rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+              message));
       return;
     }
 
-    RMApp rmApp = rmContext.getRMApps().get(applicationId);
-    FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
-    if (queue == null) {
-      return;
-    }
+    try {
+      writeLock.lock();
+      RMApp rmApp = rmContext.getRMApps().get(applicationId);
+      FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
+      if (queue == null) {
+        return;
+      }
 
-    // Enforce ACLs
-    UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
+      // Enforce ACLs
+      UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
+          user);
+
+      if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) && !queue
+          .hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
+        String msg = "User " + userUgi.getUserName()
+            + " cannot submit applications to queue " + queue.getName()
+            + "(requested queuename is " + queueName + ")";
+        LOG.info(msg);
+        rmContext.getDispatcher().getEventHandler().handle(
+            new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, msg));
+        return;
+      }
 
-    if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
-        && !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
-      String msg = "User " + userUgi.getUserName() +
-              " cannot submit applications to queue " + queue.getName() +
-              "(requested queuename is " + queueName + ")";
-      LOG.info(msg);
-      rmContext.getDispatcher().getEventHandler()
-          .handle(new RMAppEvent(applicationId,
-              RMAppEventType.APP_REJECTED, msg));
-      return;
-    }
-  
-    SchedulerApplication<FSAppAttempt> application =
-        new SchedulerApplication<FSAppAttempt>(queue, user);
-    applications.put(applicationId, application);
-    queue.getMetrics().submitApp(user);
-
-    LOG.info("Accepted application " + applicationId + " from user: " + user
-        + ", in queue: " + queue.getName()
-        + ", currently num of applications: " + applications.size());
-    if (isAppRecovering) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(applicationId
-            + " is recovering. Skip notifying APP_ACCEPTED");
+      SchedulerApplication<FSAppAttempt> application =
+          new SchedulerApplication<FSAppAttempt>(queue, user);
+      applications.put(applicationId, application);
+      queue.getMetrics().submitApp(user);
+
+        LOG.info("Accepted application " + applicationId + " from user: " + user
+            + ", in queue: " + queue.getName()
+            + ", currently num of applications: " + applications.size());
+      if (isAppRecovering) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(applicationId
+              + " is recovering. Skip notifying APP_ACCEPTED");
+        }
+      } else{
+        rmContext.getDispatcher().getEventHandler().handle(
+            new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
       }
-    } else {
-      rmContext.getDispatcher().getEventHandler()
-        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    } finally {
+      writeLock.unlock();
     }
   }
 
   /**
    * Add a new application attempt to the scheduler.
    */
-  protected synchronized void addApplicationAttempt(
+  protected void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt,
       boolean isAttemptRecovering) {
-    SchedulerApplication<FSAppAttempt> application =
-        applications.get(applicationAttemptId.getApplicationId());
-    String user = application.getUser();
-    FSLeafQueue queue = (FSLeafQueue) application.getQueue();
-
-    FSAppAttempt attempt =
-        new FSAppAttempt(this, applicationAttemptId, user,
-            queue, new ActiveUsersManager(getRootQueueMetrics()),
-            rmContext);
-    if (transferStateFromPreviousAttempt) {
-      attempt.transferStateFromPreviousAttempt(application
-          .getCurrentAppAttempt());
-    }
-    application.setCurrentAppAttempt(attempt);
-
-    boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
-    queue.addApp(attempt, runnable);
-    if (runnable) {
-      maxRunningEnforcer.trackRunnableApp(attempt);
-    } else {
-      maxRunningEnforcer.trackNonRunnableApp(attempt);
-    }
-    
-    queue.getMetrics().submitAppAttempt(user);
+    try {
+      writeLock.lock();
+      SchedulerApplication<FSAppAttempt> application = applications.get(
+          applicationAttemptId.getApplicationId());
+      String user = application.getUser();
+      FSLeafQueue queue = (FSLeafQueue) application.getQueue();
+
+      FSAppAttempt attempt = new FSAppAttempt(this, applicationAttemptId, user,
+          queue, new ActiveUsersManager(getRootQueueMetrics()), rmContext);
+      if (transferStateFromPreviousAttempt) {
+        attempt.transferStateFromPreviousAttempt(
+            application.getCurrentAppAttempt());
+      }
+      application.setCurrentAppAttempt(attempt);
+
+      boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
+      queue.addApp(attempt, runnable);
+      if (runnable) {
+        maxRunningEnforcer.trackRunnableApp(attempt);
+      } else{
+        maxRunningEnforcer.trackNonRunnableApp(attempt);
+      }
 
-    LOG.info("Added Application Attempt " + applicationAttemptId
-        + " to scheduler from user: " + user);
+      queue.getMetrics().submitAppAttempt(user);
 
-    if (isAttemptRecovering) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(applicationAttemptId
-            + " is recovering. Skipping notifying ATTEMPT_ADDED");
+      LOG.info("Added Application Attempt " + applicationAttemptId
+          + " to scheduler from user: " + user);
+
+      if (isAttemptRecovering) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(applicationAttemptId
+              + " is recovering. Skipping notifying ATTEMPT_ADDED");
+        }
+      } else{
+        rmContext.getDispatcher().getEventHandler().handle(
+            new RMAppAttemptEvent(applicationAttemptId,
+                RMAppAttemptEventType.ATTEMPT_ADDED));
       }
-    } else {
-      rmContext.getDispatcher().getEventHandler().handle(
-        new RMAppAttemptEvent(applicationAttemptId,
-            RMAppAttemptEventType.ATTEMPT_ADDED));
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -770,70 +796,71 @@ public class FairScheduler extends
     return queue;
   }
 
-  private synchronized void removeApplication(ApplicationId applicationId,
+  private void removeApplication(ApplicationId applicationId,
       RMAppState finalState) {
-    SchedulerApplication<FSAppAttempt> application =
-        applications.get(applicationId);
-    if (application == null){
+    SchedulerApplication<FSAppAttempt> application = applications.remove(
+        applicationId);
+    if (application == null) {
       LOG.warn("Couldn't find application " + applicationId);
-      return;
+    } else{
+      application.stop(finalState);
     }
-    application.stop(finalState);
-    applications.remove(applicationId);
   }
 
-  private synchronized void removeApplicationAttempt(
+  private void removeApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
-    LOG.info("Application " + applicationAttemptId + " is done." +
-        " finalState=" + rmAppAttemptFinalState);
-    SchedulerApplication<FSAppAttempt> application =
-        applications.get(applicationAttemptId.getApplicationId());
-    FSAppAttempt attempt = getSchedulerApp(applicationAttemptId);
-
-    if (attempt == null || application == null) {
-      LOG.info("Unknown application " + applicationAttemptId + " has completed!");
-      return;
-    }
-
-    // Release all the running containers
-    for (RMContainer rmContainer : attempt.getLiveContainers()) {
-      if (keepContainers
-          && rmContainer.getState().equals(RMContainerState.RUNNING)) {
-        // do not kill the running container in the case of work-preserving AM
-        // restart.
-        LOG.info("Skip killing " + rmContainer.getContainerId());
-        continue;
+    try {
+      writeLock.lock();
+      LOG.info(
+          "Application " + applicationAttemptId + " is done." + " finalState="
+              + rmAppAttemptFinalState);
+      FSAppAttempt attempt = getApplicationAttempt(applicationAttemptId);
+
+      if (attempt == null) {
+        LOG.info(
+            "Unknown application " + applicationAttemptId + " has completed!");
+        return;
       }
-      super.completedContainer(rmContainer,
-          SchedulerUtils.createAbnormalContainerStatus(
-              rmContainer.getContainerId(),
-              SchedulerUtils.COMPLETED_APPLICATION),
-              RMContainerEventType.KILL);
-    }
 
-    // Release all reserved containers
-    for (RMContainer rmContainer : attempt.getReservedContainers()) {
-      super.completedContainer(rmContainer,
-          SchedulerUtils.createAbnormalContainerStatus(
-              rmContainer.getContainerId(),
-              "Application Complete"),
-              RMContainerEventType.KILL);
-    }
-    // Clean up pending requests, metrics etc.
-    attempt.stop(rmAppAttemptFinalState);
-
-    // Inform the queue
-    FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue()
-        .getQueueName(), false);
-    boolean wasRunnable = queue.removeApp(attempt);
+      // Release all the running containers
+      for (RMContainer rmContainer : attempt.getLiveContainers()) {
+        if (keepContainers && rmContainer.getState().equals(
+            RMContainerState.RUNNING)) {
+          // do not kill the running container in the case of work-preserving AM
+          // restart.
+          LOG.info("Skip killing " + rmContainer.getContainerId());
+          continue;
+        }
+        super.completedContainer(rmContainer, SchedulerUtils
+                .createAbnormalContainerStatus(rmContainer.getContainerId(),
+                    SchedulerUtils.COMPLETED_APPLICATION),
+            RMContainerEventType.KILL);
+      }
 
-    if (wasRunnable) {
-      maxRunningEnforcer.untrackRunnableApp(attempt);
-      maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt,
-          attempt.getQueue());
-    } else {
-      maxRunningEnforcer.untrackNonRunnableApp(attempt);
+      // Release all reserved containers
+      for (RMContainer rmContainer : attempt.getReservedContainers()) {
+        super.completedContainer(rmContainer, SchedulerUtils
+            .createAbnormalContainerStatus(rmContainer.getContainerId(),
+                "Application Complete"), RMContainerEventType.KILL);
+      }
+      // Clean up pending requests, metrics etc.
+      attempt.stop(rmAppAttemptFinalState);
+
+      // Inform the queue
+      FSLeafQueue queue = queueMgr.getLeafQueue(
+          attempt.getQueue().getQueueName(), false);
+      boolean wasRunnable = queue.removeApp(attempt);
+
+      if (wasRunnable) {
+        maxRunningEnforcer.untrackRunnableApp(attempt);
+        maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt,
+            attempt.getQueue());
+      } else{
+        maxRunningEnforcer.untrackNonRunnableApp(attempt);
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -841,97 +868,108 @@ public class FairScheduler extends
    * Clean up a completed container.
    */
   @Override
-  protected synchronized void completedContainerInternal(
+  protected void completedContainerInternal(
       RMContainer rmContainer, ContainerStatus containerStatus,
       RMContainerEventType event) {
+    try {
+      writeLock.lock();
+      Container container = rmContainer.getContainer();
+
+      // Get the application for the finished container
+      FSAppAttempt application = getCurrentAttemptForContainer(
+          container.getId());
+      ApplicationId appId =
+          container.getId().getApplicationAttemptId().getApplicationId();
+      if (application == null) {
+        LOG.info(
+            "Container " + container + " of" + " finished application " + appId
+                + " completed with event " + event);
+        return;
+      }
 
-    Container container = rmContainer.getContainer();
-
-    // Get the application for the finished container
-    FSAppAttempt application =
-        getCurrentAttemptForContainer(container.getId());
-    ApplicationId appId =
-        container.getId().getApplicationAttemptId().getApplicationId();
-    if (application == null) {
-      LOG.info("Container " + container + " of" +
-          " finished application " + appId +
-          " completed with event " + event);
-      return;
-    }
-
-    // Get the node on which the container was allocated
-    FSSchedulerNode node = getFSSchedulerNode(container.getNodeId());
+      // Get the node on which the container was allocated
+      FSSchedulerNode node = getFSSchedulerNode(container.getNodeId());
 
-    if (rmContainer.getState() == RMContainerState.RESERVED) {
-      application.unreserve(rmContainer.getReservedSchedulerKey(), node);
-    } else {
-      application.containerCompleted(rmContainer, containerStatus, event);
-      node.releaseContainer(container);
-      updateRootQueueMetrics();
-    }
+      if (rmContainer.getState() == RMContainerState.RESERVED) {
+        application.unreserve(rmContainer.getReservedSchedulerKey(), node);
+      } else{
+        application.containerCompleted(rmContainer, containerStatus, event);
+        node.releaseContainer(container);
+        updateRootQueueMetrics();
+      }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Application attempt " + application.getApplicationAttemptId()
-              + " released container " + container.getId() + " on node: " + node
-              + " with event: " + event);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Application attempt " + application.getApplicationAttemptId()
+            + " released container " + container.getId() + " on node: " + node
+            + " with event: " + event);
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
-  private synchronized void addNode(List<NMContainerStatus> containerReports,
+  private void addNode(List<NMContainerStatus> containerReports,
       RMNode node) {
-    FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
-    nodeTracker.addNode(schedulerNode);
+    try {
+      writeLock.lock();
+      FSSchedulerNode schedulerNode = new FSSchedulerNode(node,
+          usePortForNodeName);
+      nodeTracker.addNode(schedulerNode);
 
-    triggerUpdate();
+      triggerUpdate();
 
-    Resource clusterResource = getClusterResource();
-    queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
-    queueMgr.getRootQueue().recomputeSteadyShares();
-    LOG.info("Added node " + node.getNodeAddress() +
-        " cluster capacity: " + clusterResource);
+      Resource clusterResource = getClusterResource();
+      queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
+      queueMgr.getRootQueue().recomputeSteadyShares();
+      LOG.info("Added node " + node.getNodeAddress() + " cluster capacity: "
+          + clusterResource);
 
-    recoverContainersOnNode(containerReports, node);
-    updateRootQueueMetrics();
+      recoverContainersOnNode(containerReports, node);
+      updateRootQueueMetrics();
+    } finally {
+      writeLock.unlock();
+    }
   }
 
-  private synchronized void removeNode(RMNode rmNode) {
-    NodeId nodeId = rmNode.getNodeID();
-    FSSchedulerNode node = nodeTracker.getNode(nodeId);
-    if (node == null) {
-      LOG.error("Attempting to remove non-existent node " + nodeId);
-      return;
-    }
+  private void removeNode(RMNode rmNode) {
+    try {
+      writeLock.lock();
+      NodeId nodeId = rmNode.getNodeID();
+      FSSchedulerNode node = nodeTracker.getNode(nodeId);
+      if (node == null) {
+        LOG.error("Attempting to remove non-existent node " + nodeId);
+        return;
+      }
 
-    // Remove running containers
-    List<RMContainer> runningContainers =
-        node.getCopiedListOfRunningContainers();
-    for (RMContainer container : runningContainers) {
-      super.completedContainer(container,
-          SchedulerUtils.createAbnormalContainerStatus(
-              container.getContainerId(),
-              SchedulerUtils.LOST_CONTAINER),
-          RMContainerEventType.KILL);
-    }
+      // Remove running containers
+      List<RMContainer> runningContainers =
+          node.getCopiedListOfRunningContainers();
+      for (RMContainer container : runningContainers) {
+        super.completedContainer(container, SchedulerUtils
+            .createAbnormalContainerStatus(container.getContainerId(),
+                SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
+      }
 
-    // Remove reservations, if any
-    RMContainer reservedContainer = node.getReservedContainer();
-    if (reservedContainer != null) {
-      super.completedContainer(reservedContainer,
-          SchedulerUtils.createAbnormalContainerStatus(
-              reservedContainer.getContainerId(),
-              SchedulerUtils.LOST_CONTAINER),
-          RMContainerEventType.KILL);
-    }
+      // Remove reservations, if any
+      RMContainer reservedContainer = node.getReservedContainer();
+      if (reservedContainer != null) {
+        super.completedContainer(reservedContainer, SchedulerUtils
+            .createAbnormalContainerStatus(reservedContainer.getContainerId(),
+                SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
+      }
 
-    nodeTracker.removeNode(nodeId);
-    Resource clusterResource = getClusterResource();
-    queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
-    queueMgr.getRootQueue().recomputeSteadyShares();
-    updateRootQueueMetrics();
-    triggerUpdate();
+      nodeTracker.removeNode(nodeId);
+      Resource clusterResource = getClusterResource();
+      queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
+      queueMgr.getRootQueue().recomputeSteadyShares();
+      updateRootQueueMetrics();
+      triggerUpdate();
 
-    LOG.info("Removed node " + rmNode.getNodeAddress() +
-        " cluster capacity: " + clusterResource);
+      LOG.info("Removed node " + rmNode.getNodeAddress() + " cluster capacity: "
+          + clusterResource);
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   @Override
@@ -960,12 +998,13 @@ public class FairScheduler extends
     // Release containers
     releaseContainers(release, application);
 
-    synchronized (application) {
+    try {
+      application.getWriteLock().lock();
       if (!ask.isEmpty()) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("allocate: pre-update" +
-              " applicationAttemptId=" + appAttemptId +
-              " application=" + application.getApplicationId());
+          LOG.debug(
+              "allocate: pre-update" + " applicationAttemptId=" + appAttemptId
+                  + " application=" + application.getApplicationId());
         }
         application.showRequests();
 
@@ -974,98 +1013,107 @@ public class FairScheduler extends
 
         application.showRequests();
       }
+    } finally {
+      application.getWriteLock().unlock();
+    }
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("allocate: post-update" +
-            " applicationAttemptId=" + appAttemptId +
-            " #ask=" + ask.size() +
-            " reservation= " + application.getCurrentReservation());
-
-        LOG.debug("Preempting " + application.getPreemptionContainers().size()
-            + " container(s)");
-      }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "allocate: post-update" + " applicationAttemptId=" + appAttemptId
+              + " #ask=" + ask.size() + " reservation= " + application
+              .getCurrentReservation());
 
-      Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
-      for (RMContainer container : application.getPreemptionContainers()) {
-        preemptionContainerIds.add(container.getContainerId());
-      }
+      LOG.debug("Preempting " + application.getPreemptionContainers().size()
+          + " container(s)");
+    }
 
-      application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+    Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
+    for (RMContainer container : application.getPreemptionContainers()) {
+      preemptionContainerIds.add(container.getContainerId());
+    }
 
-      List<Container> newlyAllocatedContainers =
-          application.pullNewlyAllocatedContainers();
-      // Record container allocation time
-      if (!(newlyAllocatedContainers.isEmpty())) {
-        application.recordContainerAllocationTime(getClock().getTime());
-      }
+    application.updateBlacklist(blacklistAdditions, blacklistRemovals);
 
-      Resource headroom = application.getHeadroom();
-      application.setApplicationHeadroomForMetrics(headroom);
-      return new Allocation(newlyAllocatedContainers, headroom,
-          preemptionContainerIds, null, null, application.pullUpdatedNMTokens());
+    List<Container> newlyAllocatedContainers =
+        application.pullNewlyAllocatedContainers();
+    // Record container allocation time
+    if (!(newlyAllocatedContainers.isEmpty())) {
+      application.recordContainerAllocationTime(getClock().getTime());
     }
+
+    Resource headroom = application.getHeadroom();
+    application.setApplicationHeadroomForMetrics(headroom);
+    return new Allocation(newlyAllocatedContainers, headroom,
+        preemptionContainerIds, null, null,
+        application.pullUpdatedNMTokens());
   }
   
   /**
    * Process a heartbeat update from a node.
    */
-  private synchronized void nodeUpdate(RMNode nm) {
-    long start = getClock().getTime();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("nodeUpdate: " + nm +
-          " cluster capacity: " + getClusterResource());
-    }
-    eventLog.log("HEARTBEAT", nm.getHostName());
-    FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
-    
-    List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
-    List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
-    List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
-    for(UpdatedContainerInfo containerInfo : containerInfoList) {
-      newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
-      completedContainers.addAll(containerInfo.getCompletedContainers());
-    } 
-    // Processing the newly launched containers
-    for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
-      containerLaunchedOnNode(launchedContainer.getContainerId(), node);
-    }
-
-    // Process completed containers
-    for (ContainerStatus completedContainer : completedContainers) {
-      ContainerId containerId = completedContainer.getContainerId();
-      LOG.debug("Container FINISHED: " + containerId);
-      super.completedContainer(getRMContainer(containerId),
-          completedContainer, RMContainerEventType.FINISHED);
-    }
+  private void nodeUpdate(RMNode nm) {
+    try {
+      writeLock.lock();
+      long start = getClock().getTime();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "nodeUpdate: " + nm + " cluster capacity: " + getClusterResource());
+      }
+      eventLog.log("HEARTBEAT", nm.getHostName());
+      FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
+
+      List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
+      List<ContainerStatus> newlyLaunchedContainers =
+          new ArrayList<ContainerStatus>();
+      List<ContainerStatus> completedContainers =
+          new ArrayList<ContainerStatus>();
+      for (UpdatedContainerInfo containerInfo : containerInfoList) {
+        newlyLaunchedContainers.addAll(
+            containerInfo.getNewlyLaunchedContainers());
+        completedContainers.addAll(containerInfo.getCompletedContainers());
+      }
+      // Processing the newly launched containers
+      for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
+        containerLaunchedOnNode(launchedContainer.getContainerId(), node);
+      }
 
-    // If the node is decommissioning, send an update to have the total
-    // resource equal to the used resource, so no available resource to
-    // schedule.
-    if (nm.getState() == NodeState.DECOMMISSIONING) {
-      this.rmContext
-          .getDispatcher()
-          .getEventHandler()
-          .handle(
-              new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
-                  .newInstance(getSchedulerNode(nm.getNodeID())
-                      .getAllocatedResource(), 0)));
-    }
+      // Process completed containers
+      for (ContainerStatus completedContainer : completedContainers) {
+        ContainerId containerId = completedContainer.getContainerId();
+        LOG.debug("Container FINISHED: " + containerId);
+        super.completedContainer(getRMContainer(containerId),
+            completedContainer, RMContainerEventType.FINISHED);
+      }
 
-    if (continuousSchedulingEnabled) {
-      if (!completedContainers.isEmpty()) {
+      // If the node is decommissioning, send an update to have the total
+      // resource equal to the used resource, so no available resource to
+      // schedule.
+      if (nm.getState() == NodeState.DECOMMISSIONING) {
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
+                .newInstance(
+                    getSchedulerNode(nm.getNodeID()).getAllocatedResource(),
+                    0)));
+      }
+
+      if (continuousSchedulingEnabled) {
+        if (!completedContainers.isEmpty()) {
+          attemptScheduling(node);
+        }
+      } else{
         attemptScheduling(node);
       }
-    } else {
-      attemptScheduling(node);
-    }
 
-    // Updating node resource utilization
-    node.setAggregatedContainersUtilization(
-        nm.getAggregatedContainersUtilization());
-    node.setNodeUtilization(nm.getNodeUtilization());
+      // Updating node resource utilization
+      node.setAggregatedContainersUtilization(
+          nm.getAggregatedContainersUtilization());
+      node.setNodeUtilization(nm.getNodeUtilization());
 
-    long duration = getClock().getTime() - start;
-    fsOpDurations.addNodeUpdateDuration(duration);
+      long duration = getClock().getTime() - start;
+      fsOpDurations.addNodeUpdateDuration(duration);
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   void continuousSchedulingAttempt() throws InterruptedException {
@@ -1126,52 +1174,59 @@ public class FairScheduler extends
   }
 
   @VisibleForTesting
-  synchronized void attemptScheduling(FSSchedulerNode node) {
-    if (rmContext.isWorkPreservingRecoveryEnabled()
-        && !rmContext.isSchedulerReadyForAllocatingContainers()) {
-      return;
-    }
+  void attemptScheduling(FSSchedulerNode node) {
+    try {
+      writeLock.lock();
+      if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext
+          .isSchedulerReadyForAllocatingContainers()) {
+        return;
+      }
 
-    final NodeId nodeID = node.getNodeID();
-    if (!nodeTracker.exists(nodeID)) {
-      // The node might have just been removed while this thread was waiting
-      // on the synchronized lock before it entered this synchronized method
-      LOG.info("Skipping scheduling as the node " + nodeID +
-          " has been removed");
-      return;
-    }
+      final NodeId nodeID = node.getNodeID();
+      if (!nodeTracker.exists(nodeID)) {
+        // The node might have just been removed while this thread was waiting
+        // on the synchronized lock before it entered this synchronized method
+        LOG.info(
+            "Skipping scheduling as the node " + nodeID + " has been removed");
+        return;
+      }
 
-    // Assign new containers...
-    // 1. Check for reserved applications
-    // 2. Schedule if there are no reservations
-
-    boolean validReservation = false;
-    FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
-    if (reservedAppSchedulable != null) {
-      validReservation = reservedAppSchedulable.assignReservedContainer(node);
-    }
-    if (!validReservation) {
-      // No reservation, schedule at queue which is farthest below fair share
-      int assignedContainers = 0;
-      Resource assignedResource = Resources.clone(Resources.none());
-      Resource maxResourcesToAssign =
-          Resources.multiply(node.getUnallocatedResource(), 0.5f);
-      while (node.getReservedContainer() == null) {
-        boolean assignedContainer = false;
-        Resource assignment = queueMgr.getRootQueue().assignContainer(node);
-        if (!assignment.equals(Resources.none())) {
-          assignedContainers++;
-          assignedContainer = true;
-          Resources.addTo(assignedResource, assignment);
-        }
-        if (!assignedContainer) { break; }
-        if (!shouldContinueAssigning(assignedContainers,
-            maxResourcesToAssign, assignedResource)) {
-          break;
+      // Assign new containers...
+      // 1. Check for reserved applications
+      // 2. Schedule if there are no reservations
+
+      boolean validReservation = false;
+      FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
+      if (reservedAppSchedulable != null) {
+        validReservation = reservedAppSchedulable.assignReservedContainer(node);
+      }
+      if (!validReservation) {
+        // No reservation, schedule at queue which is farthest below fair share
+        int assignedContainers = 0;
+        Resource assignedResource = Resources.clone(Resources.none());
+        Resource maxResourcesToAssign = Resources.multiply(
+            node.getUnallocatedResource(), 0.5f);
+        while (node.getReservedContainer() == null) {
+          boolean assignedContainer = false;
+          Resource assignment = queueMgr.getRootQueue().assignContainer(node);
+          if (!assignment.equals(Resources.none())) {
+            assignedContainers++;
+            assignedContainer = true;
+            Resources.addTo(assignedResource, assignment);
+          }
+          if (!assignedContainer) {
+            break;
+          }
+          if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign,
+              assignedResource)) {
+            break;
+          }
         }
       }
+      updateRootQueueMetrics();
+    } finally {
+      writeLock.unlock();
     }
-    updateRootQueueMetrics();
   }
 
   public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) {
@@ -1314,51 +1369,55 @@ public class FairScheduler extends
     }
   }
 
-  private synchronized String resolveReservationQueueName(String queueName,
+  private String resolveReservationQueueName(String queueName,
       ApplicationId applicationId, ReservationId reservationID,
       boolean isRecovering) {
-    FSQueue queue = queueMgr.getQueue(queueName);
-    if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) {
-      return queueName;
-    }
-    // Use fully specified name from now on (including root. prefix)
-    queueName = queue.getQueueName();
-    if (reservationID != null) {
-      String resQName = queueName + "." + reservationID.toString();
-      queue = queueMgr.getQueue(resQName);
-      if (queue == null) {
-        // reservation has terminated during failover
-        if (isRecovering && allocConf.getMoveOnExpiry(queueName)) {
-          // move to the default child queue of the plan
-          return getDefaultQueueForPlanQueue(queueName);
+    try {
+      readLock.lock();
+      FSQueue queue = queueMgr.getQueue(queueName);
+      if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) {
+        return queueName;
+      }
+      // Use fully specified name from now on (including root. prefix)
+      queueName = queue.getQueueName();
+      if (reservationID != null) {
+        String resQName = queueName + "." + reservationID.toString();
+        queue = queueMgr.getQueue(resQName);
+        if (queue == null) {
+          // reservation has terminated during failover
+          if (isRecovering && allocConf.getMoveOnExpiry(queueName)) {
+            // move to the default child queue of the plan
+            return getDefaultQueueForPlanQueue(queueName);
+          }
+          String message = "Application " + applicationId
+              + " submitted to a reservation which is not yet "
+              + "currently active: " + resQName;
+          this.rmContext.getDispatcher().getEventHandler().handle(
+              new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+                  message));
+          return null;
         }
-        String message =
-            "Application "
-                + applicationId
-                + " submitted to a reservation which is not yet currently active: "
-                + resQName;
-        this.rmContext.getDispatcher().getEventHandler()
-            .handle(new RMAppEvent(applicationId,
-                RMAppEventType.APP_REJECTED, message));
-        return null;
-      }
-      if (!queue.getParent().getQueueName().equals(queueName)) {
-        String message =
-            "Application: " + applicationId + " submitted to a reservation "
-                + resQName + " which does not belong to the specified queue: "
-                + queueName;
-        this.rmContext.getDispatcher().getEventHandler()
-            .handle(new RMAppEvent(applicationId,
-                RMAppEventType.APP_REJECTED, message));
-        return null;
-      }
-      // use the reservation queue to run the app
-      queueName = resQName;
-    } else {
-      // use the default child queue of the plan for unreserved apps
-      queueName = getDefaultQueueForPlanQueue(queueName);
+        if (!queue.getParent().getQueueName().equals(queueName)) {
+          String message =
+              "Application: " + applicationId + " submitted to a reservation "
+                  + resQName + " which does not belong to the specified queue: "
+                  + queueName;
+          this.rmContext.getDispatcher().getEventHandler().handle(
+              new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+                  message));
+          return null;
+        }
+        // use the reservation queue to run the app
+        queueName = resQName;
+      } else{
+        // use the default child queue of the plan for unreserved apps
+        queueName = getDefaultQueueForPlanQueue(queueName);
+      }
+      return queueName;
+    } finally {
+      readLock.unlock();
     }
-    return queueName;
+
   }
 
   private String getDefaultQueueForPlanQueue(String queueName) {
@@ -1372,12 +1431,13 @@ public class FairScheduler extends
     // NOT IMPLEMENTED
   }
 
-  public synchronized void setRMContext(RMContext rmContext) {
+  public void setRMContext(RMContext rmContext) {
     this.rmContext = rmContext;
   }
 
   private void initScheduler(Configuration conf) throws IOException {
-    synchronized (this) {
+    try {
+      writeLock.lock();
       this.conf = new FairSchedulerConfiguration(conf);
       validateConf(this.conf);
       minimumAllocation = this.conf.getMinimumAllocation();
@@ -1385,8 +1445,7 @@ public class FairScheduler extends
       incrAllocation = this.conf.getIncrementAllocation();
       updateReservationThreshold();
       continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
-      continuousSchedulingSleepMs =
-          this.conf.getContinuousSchedulingSleepMs();
+      continuousSchedulingSleepMs = this.conf.getContinuousSchedulingSleepMs();
       nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
       rackLocalityThreshold = this.conf.getLocalityThresholdRack();
       nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
@@ -1407,8 +1466,8 @@ public class FairScheduler extends
       if (updateInterval < 0) {
         updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
         LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS
-            + " is invalid, so using default value " +
-            +FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
+            + " is invalid, so using default value "
+            + +FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
             + " ms instead");
       }
 
@@ -1416,8 +1475,7 @@ public class FairScheduler extends
       fsOpDurations = FSOpDurations.getInstance(true);
 
       // This stores per-application scheduling information
-      this.applications = new ConcurrentHashMap<
-          ApplicationId, SchedulerApplication<FSAppAttempt>>();
+      this.applications = new ConcurrentHashMap<>();
       this.eventLog = new FairSchedulerEventLog();
       eventLog.init(this.conf);
 
@@ -1438,6 +1496,8 @@ public class FairScheduler extends
         schedulingThread.setName("FairSchedulerContinuousScheduling");
         schedulingThread.setDaemon(true);
       }
+    } finally {
+      writeLock.unlock();
     }
 
     allocsLoader.init(conf);
@@ -1460,15 +1520,21 @@ public class FairScheduler extends
     reservationThreshold = newThreshold;
   }
 
-  private synchronized void startSchedulerThreads() {
-    Preconditions.checkNotNull(updateThread, "updateThread is null");
-    Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
-    updateThread.start();
-    if (continuousSchedulingEnabled) {
-      Preconditions.checkNotNull(schedulingThread, "schedulingThread is null");
-      schedulingThread.start();
+  private void startSchedulerThreads() {
+    try {
+      writeLock.lock();
+      Preconditions.checkNotNull(updateThread, "updateThread is null");
+      Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
+      updateThread.start();
+      if (continuousSchedulingEnabled) {
+        Preconditions.checkNotNull(schedulingThread,
+            "schedulingThread is null");
+        schedulingThread.start();
+      }
+      allocsLoader.start();
+    } finally {
+      writeLock.unlock();
     }
-    allocsLoader.start();
   }
 
   @Override
@@ -1485,7 +1551,8 @@ public class FairScheduler extends
 
   @Override
   public void serviceStop() throws Exception {
-    synchronized (this) {
+    try {
+      writeLock.lock();
       if (updateThread != null) {
         updateThread.interrupt();
         updateThread.join(THREAD_JOIN_TIMEOUT_MS);
@@ -1499,6 +1566,8 @@ public class FairScheduler extends
       if (allocsLoader != null) {
         allocsLoader.stop();
       }
+    } finally {
+      writeLock.unlock();
     }
 
     super.serviceStop();
@@ -1542,17 +1611,22 @@ public class FairScheduler extends
   }
 
   @Override
-  public synchronized boolean checkAccess(UserGroupInformation callerUGI,
+  public boolean checkAccess(UserGroupInformation callerUGI,
       QueueACL acl, String queueName) {
-    FSQueue queue = getQueueManager().getQueue(queueName);
-    if (queue == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("ACL not found for queue access-type " + acl
-            + " for queue " + queueName);
+    try {
+      readLock.lock();
+      FSQueue queue = getQueueManager().getQueue(queueName);
+      if (queue == null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("ACL not found for queue access-type " + acl + " for queue "
+              + queueName);
+        }
+        return false;
       }
-      return false;
+      return queue.hasAccess(acl, callerUGI);
+    } finally {
+      readLock.unlock();
     }
-    return queue.hasAccess(acl, callerUGI);
   }
   
   public AllocationConfiguration getAllocationConfiguration() {
@@ -1566,12 +1640,16 @@ public class FairScheduler extends
     public void onReload(AllocationConfiguration queueInfo) {
       // Commit the reload; also create any queue defined in the alloc file
       // if it does not already exist, so it can be displayed on the web UI.
-      synchronized (FairScheduler.this) {
+
+      writeLock.lock();
+      try {
         allocConf = queueInfo;
         allocConf.getDefaultSchedulingPolicy().initialize(getClusterResource());
         queueMgr.updateAllocationConfiguration(allocConf);
         applyChildDefaults();
         maxRunningEnforcer.updateRunnabilityOnReload();
+      } finally {
+        writeLock.unlock();
       }
     }
   }
@@ -1616,32 +1694,41 @@ public class FairScheduler extends
   }
 
   @Override
-  public synchronized String moveApplication(ApplicationId appId,
+  public String moveApplication(ApplicationId appId,
       String queueName) throws YarnException {
-    SchedulerApplication<FSAppAttempt> app = applications.get(appId);
-    if (app == null) {
-      throw new YarnException("App to be moved " + appId + " not found.");
-    }
-    FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt();
-    // To serialize with FairScheduler#allocate, synchronize on app attempt
-    synchronized (attempt) {
-      FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
-      String destQueueName = handleMoveToPlanQueue(queueName);
-      FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false);
-      if (targetQueue == null) {
-        throw new YarnException("Target queue " + queueName
-            + " not found or is not a leaf queue.");
-      }
-      if (targetQueue == oldQueue) {
-        return oldQueue.getQueueName();
+    try {
+      writeLock.lock();
+      SchedulerApplication<FSAppAttempt> app = applications.get(appId);
+      if (app == null) {
+        throw new YarnException("App to be moved " + appId + " not found.");
       }
-      
-      if (oldQueue.isRunnableApp(attempt)) {
-        verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue);
+      FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt();
+      // To serialize with FairScheduler#allocate, synchronize on app attempt
+
+      try {
+        attempt.getWriteLock().lock();
+        FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
+        String destQueueName = handleMoveToPlanQueue(queueName);
+        FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false);
+        if (targetQueue == null) {
+          throw new YarnException("Target queue " + queueName
+              + " not found or is not a leaf queue.");
+        }
+        if (targetQueue == oldQueue) {
+          return oldQueue.getQueueName();
+        }
+
+        if (oldQueue.isRunnableApp(attempt)) {
+          verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue);
+        }
+
+        executeMove(app, attempt, oldQueue, targetQueue);
+        return targetQueue.getQueueName();
+      } finally {
+        attempt.getWriteLock().unlock();
       }
-      
-      executeMove(app, attempt, oldQueue, targetQueue);
-      return targetQueue.getQueueName();
+    } finally {
+      writeLock.unlock();
     }
   }
   
@@ -1737,12 +1824,17 @@ public class FairScheduler extends
    * Process resource update on a node and update Queue.
    */
   @Override
-  public synchronized void updateNodeResource(RMNode nm, 
+  public void updateNodeResource(RMNode nm,
       ResourceOption resourceOption) {
-    super.updateNodeResource(nm, resourceOption);
-    updateRootQueueMetrics();
-    queueMgr.getRootQueue().setSteadyFairShare(getClusterResource());
-    queueMgr.getRootQueue().recomputeSteadyShares();
+    try {
+      writeLock.lock();
+      super.updateNodeResource(nm, resourceOption);
+      updateRootQueueMetrics();
+      queueMgr.getRootQueue().setSteadyFairShare(getClusterResource());
+      queueMgr.getRootQueue().recomputeSteadyShares();
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   /** {@inheritDoc} */


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org