You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dr...@apache.org on 2016/10/08 06:10:06 UTC
[10/50] [abbrv] hadoop git commit: YARN-3139. Improve locks in
AbstractYarnScheduler/CapacityScheduler/FairScheduler. Contributed by Wangda
Tan
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f8da22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 6129772..eecd4ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -2227,6 +2227,22 @@ public class LeafQueue extends AbstractCSQueue {
}
}
+ public void updateApplicationPriority(SchedulerApplication<FiCaSchedulerApp> app,
+ Priority newAppPriority) {
+ try {
+ writeLock.lock();
+ FiCaSchedulerApp attempt = app.getCurrentAppAttempt();
+ getOrderingPolicy().removeSchedulableEntity(attempt);
+
+ // Update new priority in SchedulerApplication
+ attempt.setPriority(newAppPriority);
+
+ getOrderingPolicy().addSchedulableEntity(attempt);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
public OrderingPolicy<FiCaSchedulerApp>
getPendingAppsOrderingPolicy() {
return pendingOrderingPolicy;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f8da22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index fd43e74..aa7ad50 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -666,6 +667,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
} finally {
writeLock.unlock();
}
+ }
+ public ReentrantReadWriteLock.WriteLock getWriteLock() {
+ return this.writeLock;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f8da22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 920052f..8daf0f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -186,10 +186,13 @@ public class FairScheduler extends
// an app can be reserved on
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
- protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
- protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
+ // Continuous Scheduling enabled or not
+ protected boolean continuousSchedulingEnabled;
+ // Sleep time for each pass in continuous scheduling
+ protected volatile int continuousSchedulingSleepMs;
+ // Node available resource comparator
private Comparator<FSSchedulerNode> nodeAvailableResourceComparator =
- new NodeAvailableResourceComparator(); // Node available resource comparator
+ new NodeAvailableResourceComparator();
protected double nodeLocalityThreshold; // Cluster threshold for node locality
protected double rackLocalityThreshold; // Cluster threshold for rack locality
protected long nodeLocalityDelayMs; // Delay for node locality
@@ -338,36 +341,40 @@ public class FairScheduler extends
* fair shares, deficits, minimum slot allocations, and amount of used and
* required resources per job.
*/
- protected synchronized void update() {
- long start = getClock().getTime();
- updateStarvationStats(); // Determine if any queues merit preemption
+ protected void update() {
+ try {
+ writeLock.lock();
+ long start = getClock().getTime();
+ updateStarvationStats(); // Determine if any queues merit preemption
- FSQueue rootQueue = queueMgr.getRootQueue();
+ FSQueue rootQueue = queueMgr.getRootQueue();
- // Recursively update demands for all queues
- rootQueue.updateDemand();
+ // Recursively update demands for all queues
+ rootQueue.updateDemand();
- Resource clusterResource = getClusterResource();
- rootQueue.setFairShare(clusterResource);
- // Recursively compute fair shares for all queues
- // and update metrics
- rootQueue.recomputeShares();
- updateRootQueueMetrics();
+ Resource clusterResource = getClusterResource();
+ rootQueue.setFairShare(clusterResource);
+ // Recursively compute fair shares for all queues
+ // and update metrics
+ rootQueue.recomputeShares();
+ updateRootQueueMetrics();
- if (LOG.isDebugEnabled()) {
- if (--updatesToSkipForDebug < 0) {
- updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
- LOG.debug("Cluster Capacity: " + clusterResource +
- " Allocations: " + rootMetrics.getAllocatedResources() +
- " Availability: " + Resource.newInstance(
- rootMetrics.getAvailableMB(),
- rootMetrics.getAvailableVirtualCores()) +
- " Demand: " + rootQueue.getDemand());
+ if (LOG.isDebugEnabled()) {
+ if (--updatesToSkipForDebug < 0) {
+ updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
+ LOG.debug("Cluster Capacity: " + clusterResource + " Allocations: "
+ + rootMetrics.getAllocatedResources() + " Availability: "
+ + Resource.newInstance(rootMetrics.getAvailableMB(),
+ rootMetrics.getAvailableVirtualCores()) + " Demand: " + rootQueue
+ .getDemand());
+ }
}
- }
- long duration = getClock().getTime() - start;
- fsOpDurations.addUpdateCallDuration(duration);
+ long duration = getClock().getTime() - start;
+ fsOpDurations.addUpdateCallDuration(duration);
+ } finally {
+ writeLock.unlock();
+ }
}
/**
@@ -389,23 +396,28 @@ public class FairScheduler extends
* such queues exist, compute how many tasks of each type need to be preempted
* and then select the right ones using preemptTasks.
*/
- protected synchronized void preemptTasksIfNecessary() {
- if (!shouldAttemptPreemption()) {
- return;
- }
+ protected void preemptTasksIfNecessary() {
+ try {
+ writeLock.lock();
+ if (!shouldAttemptPreemption()) {
+ return;
+ }
- long curTime = getClock().getTime();
- if (curTime - lastPreemptCheckTime < preemptionInterval) {
- return;
- }
- lastPreemptCheckTime = curTime;
+ long curTime = getClock().getTime();
+ if (curTime - lastPreemptCheckTime < preemptionInterval) {
+ return;
+ }
+ lastPreemptCheckTime = curTime;
- Resource resToPreempt = Resources.clone(Resources.none());
- for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
- Resources.addTo(resToPreempt, resourceDeficit(sched, curTime));
- }
- if (isResourceGreaterThanNone(resToPreempt)) {
- preemptResources(resToPreempt);
+ Resource resToPreempt = Resources.clone(Resources.none());
+ for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
+ Resources.addTo(resToPreempt, resourceDeficit(sched, curTime));
+ }
+ if (isResourceGreaterThanNone(resToPreempt)) {
+ preemptResources(resToPreempt);
+ }
+ } finally {
+ writeLock.unlock();
}
}
@@ -549,22 +561,27 @@ public class FairScheduler extends
return deficit;
}
- public synchronized RMContainerTokenSecretManager
+ public RMContainerTokenSecretManager
getContainerTokenSecretManager() {
return rmContext.getContainerTokenSecretManager();
}
- // synchronized for sizeBasedWeight
- public synchronized ResourceWeights getAppWeight(FSAppAttempt app) {
- double weight = 1.0;
- if (sizeBasedWeight) {
- // Set weight based on current memory demand
- weight = Math.log1p(app.getDemand().getMemorySize()) / Math.log(2);
+ public ResourceWeights getAppWeight(FSAppAttempt app) {
+ try {
+ readLock.lock();
+ double weight = 1.0;
+ if (sizeBasedWeight) {
+ // Set weight based on current memory demand
+ weight = Math.log1p(app.getDemand().getMemorySize()) / Math.log(2);
+ }
+ weight *= app.getPriority().getPriority();
+ ResourceWeights resourceWeights = app.getResourceWeights();
+ resourceWeights.setWeight((float) weight);
+ return resourceWeights;
+ } finally {
+ readLock.unlock();
}
- weight *= app.getPriority().getPriority();
- ResourceWeights resourceWeights = app.getResourceWeights();
- resourceWeights.setWeight((float)weight);
- return resourceWeights;
+
}
public Resource getIncrementResourceCapability() {
@@ -595,7 +612,7 @@ public class FairScheduler extends
return continuousSchedulingEnabled;
}
- public synchronized int getContinuousSchedulingSleepMs() {
+ public int getContinuousSchedulingSleepMs() {
return continuousSchedulingSleepMs;
}
@@ -617,114 +634,123 @@ public class FairScheduler extends
* user. This will accept a new app even if the user or queue is above
* configured limits, but the app will not be marked as runnable.
*/
- protected synchronized void addApplication(ApplicationId applicationId,
+ protected void addApplication(ApplicationId applicationId,
String queueName, String user, boolean isAppRecovering) {
if (queueName == null || queueName.isEmpty()) {
- String message = "Reject application " + applicationId +
- " submitted by user " + user + " with an empty queue name.";
+ String message =
+ "Reject application " + applicationId + " submitted by user " + user
+ + " with an empty queue name.";
LOG.info(message);
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId,
- RMAppEventType.APP_REJECTED, message));
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
return;
}
if (queueName.startsWith(".") || queueName.endsWith(".")) {
- String message = "Reject application " + applicationId
- + " submitted by user " + user + " with an illegal queue name "
- + queueName + ". "
- + "The queue name cannot start/end with period.";
+ String message =
+ "Reject application " + applicationId + " submitted by user " + user
+ + " with an illegal queue name " + queueName + ". "
+ + "The queue name cannot start/end with period.";
LOG.info(message);
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId,
- RMAppEventType.APP_REJECTED, message));
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
return;
}
- RMApp rmApp = rmContext.getRMApps().get(applicationId);
- FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
- if (queue == null) {
- return;
- }
+ try {
+ writeLock.lock();
+ RMApp rmApp = rmContext.getRMApps().get(applicationId);
+ FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
+ if (queue == null) {
+ return;
+ }
- // Enforce ACLs
- UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
+ // Enforce ACLs
+ UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
+ user);
+
+ if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) && !queue
+ .hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
+ String msg = "User " + userUgi.getUserName()
+ + " cannot submit applications to queue " + queue.getName()
+ + "(requested queuename is " + queueName + ")";
+ LOG.info(msg);
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, msg));
+ return;
+ }
- if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
- && !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
- String msg = "User " + userUgi.getUserName() +
- " cannot submit applications to queue " + queue.getName() +
- "(requested queuename is " + queueName + ")";
- LOG.info(msg);
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId,
- RMAppEventType.APP_REJECTED, msg));
- return;
- }
-
- SchedulerApplication<FSAppAttempt> application =
- new SchedulerApplication<FSAppAttempt>(queue, user);
- applications.put(applicationId, application);
- queue.getMetrics().submitApp(user);
-
- LOG.info("Accepted application " + applicationId + " from user: " + user
- + ", in queue: " + queue.getName()
- + ", currently num of applications: " + applications.size());
- if (isAppRecovering) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(applicationId
- + " is recovering. Skip notifying APP_ACCEPTED");
+ SchedulerApplication<FSAppAttempt> application =
+ new SchedulerApplication<FSAppAttempt>(queue, user);
+ applications.put(applicationId, application);
+ queue.getMetrics().submitApp(user);
+
+ LOG.info("Accepted application " + applicationId + " from user: " + user
+ + ", in queue: " + queue.getName()
+ + ", currently num of applications: " + applications.size());
+ if (isAppRecovering) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(applicationId
+ + " is recovering. Skip notifying APP_ACCEPTED");
+ }
+ } else{
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
- } else {
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+ } finally {
+ writeLock.unlock();
}
}
/**
* Add a new application attempt to the scheduler.
*/
- protected synchronized void addApplicationAttempt(
+ protected void addApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt,
boolean isAttemptRecovering) {
- SchedulerApplication<FSAppAttempt> application =
- applications.get(applicationAttemptId.getApplicationId());
- String user = application.getUser();
- FSLeafQueue queue = (FSLeafQueue) application.getQueue();
-
- FSAppAttempt attempt =
- new FSAppAttempt(this, applicationAttemptId, user,
- queue, new ActiveUsersManager(getRootQueueMetrics()),
- rmContext);
- if (transferStateFromPreviousAttempt) {
- attempt.transferStateFromPreviousAttempt(application
- .getCurrentAppAttempt());
- }
- application.setCurrentAppAttempt(attempt);
-
- boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
- queue.addApp(attempt, runnable);
- if (runnable) {
- maxRunningEnforcer.trackRunnableApp(attempt);
- } else {
- maxRunningEnforcer.trackNonRunnableApp(attempt);
- }
-
- queue.getMetrics().submitAppAttempt(user);
+ try {
+ writeLock.lock();
+ SchedulerApplication<FSAppAttempt> application = applications.get(
+ applicationAttemptId.getApplicationId());
+ String user = application.getUser();
+ FSLeafQueue queue = (FSLeafQueue) application.getQueue();
+
+ FSAppAttempt attempt = new FSAppAttempt(this, applicationAttemptId, user,
+ queue, new ActiveUsersManager(getRootQueueMetrics()), rmContext);
+ if (transferStateFromPreviousAttempt) {
+ attempt.transferStateFromPreviousAttempt(
+ application.getCurrentAppAttempt());
+ }
+ application.setCurrentAppAttempt(attempt);
+
+ boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
+ queue.addApp(attempt, runnable);
+ if (runnable) {
+ maxRunningEnforcer.trackRunnableApp(attempt);
+ } else{
+ maxRunningEnforcer.trackNonRunnableApp(attempt);
+ }
- LOG.info("Added Application Attempt " + applicationAttemptId
- + " to scheduler from user: " + user);
+ queue.getMetrics().submitAppAttempt(user);
- if (isAttemptRecovering) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(applicationAttemptId
- + " is recovering. Skipping notifying ATTEMPT_ADDED");
+ LOG.info("Added Application Attempt " + applicationAttemptId
+ + " to scheduler from user: " + user);
+
+ if (isAttemptRecovering) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(applicationAttemptId
+ + " is recovering. Skipping notifying ATTEMPT_ADDED");
+ }
+ } else{
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptEvent(applicationAttemptId,
+ RMAppAttemptEventType.ATTEMPT_ADDED));
}
- } else {
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptEvent(applicationAttemptId,
- RMAppAttemptEventType.ATTEMPT_ADDED));
+ } finally {
+ writeLock.unlock();
}
}
@@ -770,70 +796,71 @@ public class FairScheduler extends
return queue;
}
- private synchronized void removeApplication(ApplicationId applicationId,
+ private void removeApplication(ApplicationId applicationId,
RMAppState finalState) {
- SchedulerApplication<FSAppAttempt> application =
- applications.get(applicationId);
- if (application == null){
+ SchedulerApplication<FSAppAttempt> application = applications.remove(
+ applicationId);
+ if (application == null) {
LOG.warn("Couldn't find application " + applicationId);
- return;
+ } else{
+ application.stop(finalState);
}
- application.stop(finalState);
- applications.remove(applicationId);
}
- private synchronized void removeApplicationAttempt(
+ private void removeApplicationAttempt(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
- LOG.info("Application " + applicationAttemptId + " is done." +
- " finalState=" + rmAppAttemptFinalState);
- SchedulerApplication<FSAppAttempt> application =
- applications.get(applicationAttemptId.getApplicationId());
- FSAppAttempt attempt = getSchedulerApp(applicationAttemptId);
-
- if (attempt == null || application == null) {
- LOG.info("Unknown application " + applicationAttemptId + " has completed!");
- return;
- }
-
- // Release all the running containers
- for (RMContainer rmContainer : attempt.getLiveContainers()) {
- if (keepContainers
- && rmContainer.getState().equals(RMContainerState.RUNNING)) {
- // do not kill the running container in the case of work-preserving AM
- // restart.
- LOG.info("Skip killing " + rmContainer.getContainerId());
- continue;
+ try {
+ writeLock.lock();
+ LOG.info(
+ "Application " + applicationAttemptId + " is done." + " finalState="
+ + rmAppAttemptFinalState);
+ FSAppAttempt attempt = getApplicationAttempt(applicationAttemptId);
+
+ if (attempt == null) {
+ LOG.info(
+ "Unknown application " + applicationAttemptId + " has completed!");
+ return;
}
- super.completedContainer(rmContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- rmContainer.getContainerId(),
- SchedulerUtils.COMPLETED_APPLICATION),
- RMContainerEventType.KILL);
- }
- // Release all reserved containers
- for (RMContainer rmContainer : attempt.getReservedContainers()) {
- super.completedContainer(rmContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- rmContainer.getContainerId(),
- "Application Complete"),
- RMContainerEventType.KILL);
- }
- // Clean up pending requests, metrics etc.
- attempt.stop(rmAppAttemptFinalState);
-
- // Inform the queue
- FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue()
- .getQueueName(), false);
- boolean wasRunnable = queue.removeApp(attempt);
+ // Release all the running containers
+ for (RMContainer rmContainer : attempt.getLiveContainers()) {
+ if (keepContainers && rmContainer.getState().equals(
+ RMContainerState.RUNNING)) {
+ // do not kill the running container in the case of work-preserving AM
+ // restart.
+ LOG.info("Skip killing " + rmContainer.getContainerId());
+ continue;
+ }
+ super.completedContainer(rmContainer, SchedulerUtils
+ .createAbnormalContainerStatus(rmContainer.getContainerId(),
+ SchedulerUtils.COMPLETED_APPLICATION),
+ RMContainerEventType.KILL);
+ }
- if (wasRunnable) {
- maxRunningEnforcer.untrackRunnableApp(attempt);
- maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt,
- attempt.getQueue());
- } else {
- maxRunningEnforcer.untrackNonRunnableApp(attempt);
+ // Release all reserved containers
+ for (RMContainer rmContainer : attempt.getReservedContainers()) {
+ super.completedContainer(rmContainer, SchedulerUtils
+ .createAbnormalContainerStatus(rmContainer.getContainerId(),
+ "Application Complete"), RMContainerEventType.KILL);
+ }
+ // Clean up pending requests, metrics etc.
+ attempt.stop(rmAppAttemptFinalState);
+
+ // Inform the queue
+ FSLeafQueue queue = queueMgr.getLeafQueue(
+ attempt.getQueue().getQueueName(), false);
+ boolean wasRunnable = queue.removeApp(attempt);
+
+ if (wasRunnable) {
+ maxRunningEnforcer.untrackRunnableApp(attempt);
+ maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt,
+ attempt.getQueue());
+ } else{
+ maxRunningEnforcer.untrackNonRunnableApp(attempt);
+ }
+ } finally {
+ writeLock.unlock();
}
}
@@ -841,97 +868,108 @@ public class FairScheduler extends
* Clean up a completed container.
*/
@Override
- protected synchronized void completedContainerInternal(
+ protected void completedContainerInternal(
RMContainer rmContainer, ContainerStatus containerStatus,
RMContainerEventType event) {
+ try {
+ writeLock.lock();
+ Container container = rmContainer.getContainer();
+
+ // Get the application for the finished container
+ FSAppAttempt application = getCurrentAttemptForContainer(
+ container.getId());
+ ApplicationId appId =
+ container.getId().getApplicationAttemptId().getApplicationId();
+ if (application == null) {
+ LOG.info(
+ "Container " + container + " of" + " finished application " + appId
+ + " completed with event " + event);
+ return;
+ }
- Container container = rmContainer.getContainer();
-
- // Get the application for the finished container
- FSAppAttempt application =
- getCurrentAttemptForContainer(container.getId());
- ApplicationId appId =
- container.getId().getApplicationAttemptId().getApplicationId();
- if (application == null) {
- LOG.info("Container " + container + " of" +
- " finished application " + appId +
- " completed with event " + event);
- return;
- }
-
- // Get the node on which the container was allocated
- FSSchedulerNode node = getFSSchedulerNode(container.getNodeId());
+ // Get the node on which the container was allocated
+ FSSchedulerNode node = getFSSchedulerNode(container.getNodeId());
- if (rmContainer.getState() == RMContainerState.RESERVED) {
- application.unreserve(rmContainer.getReservedSchedulerKey(), node);
- } else {
- application.containerCompleted(rmContainer, containerStatus, event);
- node.releaseContainer(container);
- updateRootQueueMetrics();
- }
+ if (rmContainer.getState() == RMContainerState.RESERVED) {
+ application.unreserve(rmContainer.getReservedSchedulerKey(), node);
+ } else{
+ application.containerCompleted(rmContainer, containerStatus, event);
+ node.releaseContainer(container);
+ updateRootQueueMetrics();
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Application attempt " + application.getApplicationAttemptId()
- + " released container " + container.getId() + " on node: " + node
- + " with event: " + event);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Application attempt " + application.getApplicationAttemptId()
+ + " released container " + container.getId() + " on node: " + node
+ + " with event: " + event);
+ }
+ } finally {
+ writeLock.unlock();
}
}
- private synchronized void addNode(List<NMContainerStatus> containerReports,
+ private void addNode(List<NMContainerStatus> containerReports,
RMNode node) {
- FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
- nodeTracker.addNode(schedulerNode);
+ try {
+ writeLock.lock();
+ FSSchedulerNode schedulerNode = new FSSchedulerNode(node,
+ usePortForNodeName);
+ nodeTracker.addNode(schedulerNode);
- triggerUpdate();
+ triggerUpdate();
- Resource clusterResource = getClusterResource();
- queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
- queueMgr.getRootQueue().recomputeSteadyShares();
- LOG.info("Added node " + node.getNodeAddress() +
- " cluster capacity: " + clusterResource);
+ Resource clusterResource = getClusterResource();
+ queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
+ queueMgr.getRootQueue().recomputeSteadyShares();
+ LOG.info("Added node " + node.getNodeAddress() + " cluster capacity: "
+ + clusterResource);
- recoverContainersOnNode(containerReports, node);
- updateRootQueueMetrics();
+ recoverContainersOnNode(containerReports, node);
+ updateRootQueueMetrics();
+ } finally {
+ writeLock.unlock();
+ }
}
- private synchronized void removeNode(RMNode rmNode) {
- NodeId nodeId = rmNode.getNodeID();
- FSSchedulerNode node = nodeTracker.getNode(nodeId);
- if (node == null) {
- LOG.error("Attempting to remove non-existent node " + nodeId);
- return;
- }
+ private void removeNode(RMNode rmNode) {
+ try {
+ writeLock.lock();
+ NodeId nodeId = rmNode.getNodeID();
+ FSSchedulerNode node = nodeTracker.getNode(nodeId);
+ if (node == null) {
+ LOG.error("Attempting to remove non-existent node " + nodeId);
+ return;
+ }
- // Remove running containers
- List<RMContainer> runningContainers =
- node.getCopiedListOfRunningContainers();
- for (RMContainer container : runningContainers) {
- super.completedContainer(container,
- SchedulerUtils.createAbnormalContainerStatus(
- container.getContainerId(),
- SchedulerUtils.LOST_CONTAINER),
- RMContainerEventType.KILL);
- }
+ // Remove running containers
+ List<RMContainer> runningContainers =
+ node.getCopiedListOfRunningContainers();
+ for (RMContainer container : runningContainers) {
+ super.completedContainer(container, SchedulerUtils
+ .createAbnormalContainerStatus(container.getContainerId(),
+ SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
+ }
- // Remove reservations, if any
- RMContainer reservedContainer = node.getReservedContainer();
- if (reservedContainer != null) {
- super.completedContainer(reservedContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- reservedContainer.getContainerId(),
- SchedulerUtils.LOST_CONTAINER),
- RMContainerEventType.KILL);
- }
+ // Remove reservations, if any
+ RMContainer reservedContainer = node.getReservedContainer();
+ if (reservedContainer != null) {
+ super.completedContainer(reservedContainer, SchedulerUtils
+ .createAbnormalContainerStatus(reservedContainer.getContainerId(),
+ SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
+ }
- nodeTracker.removeNode(nodeId);
- Resource clusterResource = getClusterResource();
- queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
- queueMgr.getRootQueue().recomputeSteadyShares();
- updateRootQueueMetrics();
- triggerUpdate();
+ nodeTracker.removeNode(nodeId);
+ Resource clusterResource = getClusterResource();
+ queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
+ queueMgr.getRootQueue().recomputeSteadyShares();
+ updateRootQueueMetrics();
+ triggerUpdate();
- LOG.info("Removed node " + rmNode.getNodeAddress() +
- " cluster capacity: " + clusterResource);
+ LOG.info("Removed node " + rmNode.getNodeAddress() + " cluster capacity: "
+ + clusterResource);
+ } finally {
+ writeLock.unlock();
+ }
}
@Override
@@ -960,12 +998,13 @@ public class FairScheduler extends
// Release containers
releaseContainers(release, application);
- synchronized (application) {
+ try {
+ application.getWriteLock().lock();
if (!ask.isEmpty()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("allocate: pre-update" +
- " applicationAttemptId=" + appAttemptId +
- " application=" + application.getApplicationId());
+ LOG.debug(
+ "allocate: pre-update" + " applicationAttemptId=" + appAttemptId
+ + " application=" + application.getApplicationId());
}
application.showRequests();
@@ -974,98 +1013,107 @@ public class FairScheduler extends
application.showRequests();
}
+ } finally {
+ application.getWriteLock().unlock();
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("allocate: post-update" +
- " applicationAttemptId=" + appAttemptId +
- " #ask=" + ask.size() +
- " reservation= " + application.getCurrentReservation());
-
- LOG.debug("Preempting " + application.getPreemptionContainers().size()
- + " container(s)");
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "allocate: post-update" + " applicationAttemptId=" + appAttemptId
+ + " #ask=" + ask.size() + " reservation= " + application
+ .getCurrentReservation());
- Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
- for (RMContainer container : application.getPreemptionContainers()) {
- preemptionContainerIds.add(container.getContainerId());
- }
+ LOG.debug("Preempting " + application.getPreemptionContainers().size()
+ + " container(s)");
+ }
- application.updateBlacklist(blacklistAdditions, blacklistRemovals);
+ Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
+ for (RMContainer container : application.getPreemptionContainers()) {
+ preemptionContainerIds.add(container.getContainerId());
+ }
- List<Container> newlyAllocatedContainers =
- application.pullNewlyAllocatedContainers();
- // Record container allocation time
- if (!(newlyAllocatedContainers.isEmpty())) {
- application.recordContainerAllocationTime(getClock().getTime());
- }
+ application.updateBlacklist(blacklistAdditions, blacklistRemovals);
- Resource headroom = application.getHeadroom();
- application.setApplicationHeadroomForMetrics(headroom);
- return new Allocation(newlyAllocatedContainers, headroom,
- preemptionContainerIds, null, null, application.pullUpdatedNMTokens());
+ List<Container> newlyAllocatedContainers =
+ application.pullNewlyAllocatedContainers();
+ // Record container allocation time
+ if (!(newlyAllocatedContainers.isEmpty())) {
+ application.recordContainerAllocationTime(getClock().getTime());
}
+
+ Resource headroom = application.getHeadroom();
+ application.setApplicationHeadroomForMetrics(headroom);
+ return new Allocation(newlyAllocatedContainers, headroom,
+ preemptionContainerIds, null, null,
+ application.pullUpdatedNMTokens());
}
/**
* Process a heartbeat update from a node.
*/
- private synchronized void nodeUpdate(RMNode nm) {
- long start = getClock().getTime();
- if (LOG.isDebugEnabled()) {
- LOG.debug("nodeUpdate: " + nm +
- " cluster capacity: " + getClusterResource());
- }
- eventLog.log("HEARTBEAT", nm.getHostName());
- FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
-
- List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
- List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
- List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
- for(UpdatedContainerInfo containerInfo : containerInfoList) {
- newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
- completedContainers.addAll(containerInfo.getCompletedContainers());
- }
- // Processing the newly launched containers
- for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
- containerLaunchedOnNode(launchedContainer.getContainerId(), node);
- }
-
- // Process completed containers
- for (ContainerStatus completedContainer : completedContainers) {
- ContainerId containerId = completedContainer.getContainerId();
- LOG.debug("Container FINISHED: " + containerId);
- super.completedContainer(getRMContainer(containerId),
- completedContainer, RMContainerEventType.FINISHED);
- }
+ private void nodeUpdate(RMNode nm) {
+ try {
+ writeLock.lock();
+ long start = getClock().getTime();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "nodeUpdate: " + nm + " cluster capacity: " + getClusterResource());
+ }
+ eventLog.log("HEARTBEAT", nm.getHostName());
+ FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
+
+ List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
+ List<ContainerStatus> newlyLaunchedContainers =
+ new ArrayList<ContainerStatus>();
+ List<ContainerStatus> completedContainers =
+ new ArrayList<ContainerStatus>();
+ for (UpdatedContainerInfo containerInfo : containerInfoList) {
+ newlyLaunchedContainers.addAll(
+ containerInfo.getNewlyLaunchedContainers());
+ completedContainers.addAll(containerInfo.getCompletedContainers());
+ }
+ // Processing the newly launched containers
+ for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
+ containerLaunchedOnNode(launchedContainer.getContainerId(), node);
+ }
- // If the node is decommissioning, send an update to have the total
- // resource equal to the used resource, so no available resource to
- // schedule.
- if (nm.getState() == NodeState.DECOMMISSIONING) {
- this.rmContext
- .getDispatcher()
- .getEventHandler()
- .handle(
- new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
- .newInstance(getSchedulerNode(nm.getNodeID())
- .getAllocatedResource(), 0)));
- }
+ // Process completed containers
+ for (ContainerStatus completedContainer : completedContainers) {
+ ContainerId containerId = completedContainer.getContainerId();
+ LOG.debug("Container FINISHED: " + containerId);
+ super.completedContainer(getRMContainer(containerId),
+ completedContainer, RMContainerEventType.FINISHED);
+ }
- if (continuousSchedulingEnabled) {
- if (!completedContainers.isEmpty()) {
+ // If the node is decommissioning, send an update to have the total
+ // resource equal to the used resource, so no available resource to
+ // schedule.
+ if (nm.getState() == NodeState.DECOMMISSIONING) {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
+ .newInstance(
+ getSchedulerNode(nm.getNodeID()).getAllocatedResource(),
+ 0)));
+ }
+
+ if (continuousSchedulingEnabled) {
+ if (!completedContainers.isEmpty()) {
+ attemptScheduling(node);
+ }
+ } else{
attemptScheduling(node);
}
- } else {
- attemptScheduling(node);
- }
- // Updating node resource utilization
- node.setAggregatedContainersUtilization(
- nm.getAggregatedContainersUtilization());
- node.setNodeUtilization(nm.getNodeUtilization());
+ // Updating node resource utilization
+ node.setAggregatedContainersUtilization(
+ nm.getAggregatedContainersUtilization());
+ node.setNodeUtilization(nm.getNodeUtilization());
- long duration = getClock().getTime() - start;
- fsOpDurations.addNodeUpdateDuration(duration);
+ long duration = getClock().getTime() - start;
+ fsOpDurations.addNodeUpdateDuration(duration);
+ } finally {
+ writeLock.unlock();
+ }
}
void continuousSchedulingAttempt() throws InterruptedException {
@@ -1126,52 +1174,59 @@ public class FairScheduler extends
}
@VisibleForTesting
- synchronized void attemptScheduling(FSSchedulerNode node) {
- if (rmContext.isWorkPreservingRecoveryEnabled()
- && !rmContext.isSchedulerReadyForAllocatingContainers()) {
- return;
- }
+ void attemptScheduling(FSSchedulerNode node) {
+ try {
+ writeLock.lock();
+ if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext
+ .isSchedulerReadyForAllocatingContainers()) {
+ return;
+ }
- final NodeId nodeID = node.getNodeID();
- if (!nodeTracker.exists(nodeID)) {
- // The node might have just been removed while this thread was waiting
- // on the synchronized lock before it entered this synchronized method
- LOG.info("Skipping scheduling as the node " + nodeID +
- " has been removed");
- return;
- }
+ final NodeId nodeID = node.getNodeID();
+ if (!nodeTracker.exists(nodeID)) {
+ // The node might have just been removed while this thread was waiting
+ // on the synchronized lock before it entered this synchronized method
+ LOG.info(
+ "Skipping scheduling as the node " + nodeID + " has been removed");
+ return;
+ }
- // Assign new containers...
- // 1. Check for reserved applications
- // 2. Schedule if there are no reservations
-
- boolean validReservation = false;
- FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
- if (reservedAppSchedulable != null) {
- validReservation = reservedAppSchedulable.assignReservedContainer(node);
- }
- if (!validReservation) {
- // No reservation, schedule at queue which is farthest below fair share
- int assignedContainers = 0;
- Resource assignedResource = Resources.clone(Resources.none());
- Resource maxResourcesToAssign =
- Resources.multiply(node.getUnallocatedResource(), 0.5f);
- while (node.getReservedContainer() == null) {
- boolean assignedContainer = false;
- Resource assignment = queueMgr.getRootQueue().assignContainer(node);
- if (!assignment.equals(Resources.none())) {
- assignedContainers++;
- assignedContainer = true;
- Resources.addTo(assignedResource, assignment);
- }
- if (!assignedContainer) { break; }
- if (!shouldContinueAssigning(assignedContainers,
- maxResourcesToAssign, assignedResource)) {
- break;
+ // Assign new containers...
+ // 1. Check for reserved applications
+ // 2. Schedule if there are no reservations
+
+ boolean validReservation = false;
+ FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
+ if (reservedAppSchedulable != null) {
+ validReservation = reservedAppSchedulable.assignReservedContainer(node);
+ }
+ if (!validReservation) {
+ // No reservation, schedule at queue which is farthest below fair share
+ int assignedContainers = 0;
+ Resource assignedResource = Resources.clone(Resources.none());
+ Resource maxResourcesToAssign = Resources.multiply(
+ node.getUnallocatedResource(), 0.5f);
+ while (node.getReservedContainer() == null) {
+ boolean assignedContainer = false;
+ Resource assignment = queueMgr.getRootQueue().assignContainer(node);
+ if (!assignment.equals(Resources.none())) {
+ assignedContainers++;
+ assignedContainer = true;
+ Resources.addTo(assignedResource, assignment);
+ }
+ if (!assignedContainer) {
+ break;
+ }
+ if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign,
+ assignedResource)) {
+ break;
+ }
}
}
+ updateRootQueueMetrics();
+ } finally {
+ writeLock.unlock();
}
- updateRootQueueMetrics();
}
public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) {
@@ -1314,51 +1369,55 @@ public class FairScheduler extends
}
}
- private synchronized String resolveReservationQueueName(String queueName,
+ private String resolveReservationQueueName(String queueName,
ApplicationId applicationId, ReservationId reservationID,
boolean isRecovering) {
- FSQueue queue = queueMgr.getQueue(queueName);
- if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) {
- return queueName;
- }
- // Use fully specified name from now on (including root. prefix)
- queueName = queue.getQueueName();
- if (reservationID != null) {
- String resQName = queueName + "." + reservationID.toString();
- queue = queueMgr.getQueue(resQName);
- if (queue == null) {
- // reservation has terminated during failover
- if (isRecovering && allocConf.getMoveOnExpiry(queueName)) {
- // move to the default child queue of the plan
- return getDefaultQueueForPlanQueue(queueName);
+ try {
+ readLock.lock();
+ FSQueue queue = queueMgr.getQueue(queueName);
+ if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) {
+ return queueName;
+ }
+ // Use fully specified name from now on (including root. prefix)
+ queueName = queue.getQueueName();
+ if (reservationID != null) {
+ String resQName = queueName + "." + reservationID.toString();
+ queue = queueMgr.getQueue(resQName);
+ if (queue == null) {
+ // reservation has terminated during failover
+ if (isRecovering && allocConf.getMoveOnExpiry(queueName)) {
+ // move to the default child queue of the plan
+ return getDefaultQueueForPlanQueue(queueName);
+ }
+ String message = "Application " + applicationId
+ + " submitted to a reservation which is not yet "
+ + "currently active: " + resQName;
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
+ return null;
}
- String message =
- "Application "
- + applicationId
- + " submitted to a reservation which is not yet currently active: "
- + resQName;
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId,
- RMAppEventType.APP_REJECTED, message));
- return null;
- }
- if (!queue.getParent().getQueueName().equals(queueName)) {
- String message =
- "Application: " + applicationId + " submitted to a reservation "
- + resQName + " which does not belong to the specified queue: "
- + queueName;
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId,
- RMAppEventType.APP_REJECTED, message));
- return null;
- }
- // use the reservation queue to run the app
- queueName = resQName;
- } else {
- // use the default child queue of the plan for unreserved apps
- queueName = getDefaultQueueForPlanQueue(queueName);
+ if (!queue.getParent().getQueueName().equals(queueName)) {
+ String message =
+ "Application: " + applicationId + " submitted to a reservation "
+ + resQName + " which does not belong to the specified queue: "
+ + queueName;
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
+ return null;
+ }
+ // use the reservation queue to run the app
+ queueName = resQName;
+ } else{
+ // use the default child queue of the plan for unreserved apps
+ queueName = getDefaultQueueForPlanQueue(queueName);
+ }
+ return queueName;
+ } finally {
+ readLock.unlock();
}
- return queueName;
+
}
private String getDefaultQueueForPlanQueue(String queueName) {
@@ -1372,12 +1431,13 @@ public class FairScheduler extends
// NOT IMPLEMENTED
}
- public synchronized void setRMContext(RMContext rmContext) {
+ public void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
}
private void initScheduler(Configuration conf) throws IOException {
- synchronized (this) {
+ try {
+ writeLock.lock();
this.conf = new FairSchedulerConfiguration(conf);
validateConf(this.conf);
minimumAllocation = this.conf.getMinimumAllocation();
@@ -1385,8 +1445,7 @@ public class FairScheduler extends
incrAllocation = this.conf.getIncrementAllocation();
updateReservationThreshold();
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
- continuousSchedulingSleepMs =
- this.conf.getContinuousSchedulingSleepMs();
+ continuousSchedulingSleepMs = this.conf.getContinuousSchedulingSleepMs();
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
@@ -1407,8 +1466,8 @@ public class FairScheduler extends
if (updateInterval < 0) {
updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS
- + " is invalid, so using default value " +
- +FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
+ + " is invalid, so using default value "
+ + +FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
+ " ms instead");
}
@@ -1416,8 +1475,7 @@ public class FairScheduler extends
fsOpDurations = FSOpDurations.getInstance(true);
// This stores per-application scheduling information
- this.applications = new ConcurrentHashMap<
- ApplicationId, SchedulerApplication<FSAppAttempt>>();
+ this.applications = new ConcurrentHashMap<>();
this.eventLog = new FairSchedulerEventLog();
eventLog.init(this.conf);
@@ -1438,6 +1496,8 @@ public class FairScheduler extends
schedulingThread.setName("FairSchedulerContinuousScheduling");
schedulingThread.setDaemon(true);
}
+ } finally {
+ writeLock.unlock();
}
allocsLoader.init(conf);
@@ -1460,15 +1520,21 @@ public class FairScheduler extends
reservationThreshold = newThreshold;
}
- private synchronized void startSchedulerThreads() {
- Preconditions.checkNotNull(updateThread, "updateThread is null");
- Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
- updateThread.start();
- if (continuousSchedulingEnabled) {
- Preconditions.checkNotNull(schedulingThread, "schedulingThread is null");
- schedulingThread.start();
+ private void startSchedulerThreads() {
+ try {
+ writeLock.lock();
+ Preconditions.checkNotNull(updateThread, "updateThread is null");
+ Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
+ updateThread.start();
+ if (continuousSchedulingEnabled) {
+ Preconditions.checkNotNull(schedulingThread,
+ "schedulingThread is null");
+ schedulingThread.start();
+ }
+ allocsLoader.start();
+ } finally {
+ writeLock.unlock();
}
- allocsLoader.start();
}
@Override
@@ -1485,7 +1551,8 @@ public class FairScheduler extends
@Override
public void serviceStop() throws Exception {
- synchronized (this) {
+ try {
+ writeLock.lock();
if (updateThread != null) {
updateThread.interrupt();
updateThread.join(THREAD_JOIN_TIMEOUT_MS);
@@ -1499,6 +1566,8 @@ public class FairScheduler extends
if (allocsLoader != null) {
allocsLoader.stop();
}
+ } finally {
+ writeLock.unlock();
}
super.serviceStop();
@@ -1542,17 +1611,22 @@ public class FairScheduler extends
}
@Override
- public synchronized boolean checkAccess(UserGroupInformation callerUGI,
+ public boolean checkAccess(UserGroupInformation callerUGI,
QueueACL acl, String queueName) {
- FSQueue queue = getQueueManager().getQueue(queueName);
- if (queue == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("ACL not found for queue access-type " + acl
- + " for queue " + queueName);
+ try {
+ readLock.lock();
+ FSQueue queue = getQueueManager().getQueue(queueName);
+ if (queue == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ACL not found for queue access-type " + acl + " for queue "
+ + queueName);
+ }
+ return false;
}
- return false;
+ return queue.hasAccess(acl, callerUGI);
+ } finally {
+ readLock.unlock();
}
- return queue.hasAccess(acl, callerUGI);
}
public AllocationConfiguration getAllocationConfiguration() {
@@ -1566,12 +1640,16 @@ public class FairScheduler extends
public void onReload(AllocationConfiguration queueInfo) {
// Commit the reload; also create any queue defined in the alloc file
// if it does not already exist, so it can be displayed on the web UI.
- synchronized (FairScheduler.this) {
+
+ writeLock.lock();
+ try {
allocConf = queueInfo;
allocConf.getDefaultSchedulingPolicy().initialize(getClusterResource());
queueMgr.updateAllocationConfiguration(allocConf);
applyChildDefaults();
maxRunningEnforcer.updateRunnabilityOnReload();
+ } finally {
+ writeLock.unlock();
}
}
}
@@ -1616,32 +1694,41 @@ public class FairScheduler extends
}
@Override
- public synchronized String moveApplication(ApplicationId appId,
+ public String moveApplication(ApplicationId appId,
String queueName) throws YarnException {
- SchedulerApplication<FSAppAttempt> app = applications.get(appId);
- if (app == null) {
- throw new YarnException("App to be moved " + appId + " not found.");
- }
- FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt();
- // To serialize with FairScheduler#allocate, synchronize on app attempt
- synchronized (attempt) {
- FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
- String destQueueName = handleMoveToPlanQueue(queueName);
- FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false);
- if (targetQueue == null) {
- throw new YarnException("Target queue " + queueName
- + " not found or is not a leaf queue.");
- }
- if (targetQueue == oldQueue) {
- return oldQueue.getQueueName();
+ try {
+ writeLock.lock();
+ SchedulerApplication<FSAppAttempt> app = applications.get(appId);
+ if (app == null) {
+ throw new YarnException("App to be moved " + appId + " not found.");
}
-
- if (oldQueue.isRunnableApp(attempt)) {
- verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue);
+ FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt();
+ // To serialize with FairScheduler#allocate, synchronize on app attempt
+
+ try {
+ attempt.getWriteLock().lock();
+ FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
+ String destQueueName = handleMoveToPlanQueue(queueName);
+ FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false);
+ if (targetQueue == null) {
+ throw new YarnException("Target queue " + queueName
+ + " not found or is not a leaf queue.");
+ }
+ if (targetQueue == oldQueue) {
+ return oldQueue.getQueueName();
+ }
+
+ if (oldQueue.isRunnableApp(attempt)) {
+ verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue);
+ }
+
+ executeMove(app, attempt, oldQueue, targetQueue);
+ return targetQueue.getQueueName();
+ } finally {
+ attempt.getWriteLock().unlock();
}
-
- executeMove(app, attempt, oldQueue, targetQueue);
- return targetQueue.getQueueName();
+ } finally {
+ writeLock.unlock();
}
}
@@ -1737,12 +1824,17 @@ public class FairScheduler extends
* Process resource update on a node and update Queue.
*/
@Override
- public synchronized void updateNodeResource(RMNode nm,
+ public void updateNodeResource(RMNode nm,
ResourceOption resourceOption) {
- super.updateNodeResource(nm, resourceOption);
- updateRootQueueMetrics();
- queueMgr.getRootQueue().setSteadyFairShare(getClusterResource());
- queueMgr.getRootQueue().recomputeSteadyShares();
+ try {
+ writeLock.lock();
+ super.updateNodeResource(nm, resourceOption);
+ updateRootQueueMetrics();
+ queueMgr.getRootQueue().setSteadyFairShare(getClusterResource());
+ queueMgr.getRootQueue().recomputeSteadyShares();
+ } finally {
+ writeLock.unlock();
+ }
}
/** {@inheritDoc} */
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org