You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ac...@apache.org on 2012/08/08 20:55:15 UTC
svn commit: r1370891 - in
/hadoop/common/branches/branch-2.1.0-alpha/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/
Author: acmurthy
Date: Wed Aug 8 18:55:15 2012
New Revision: 1370891
URL: http://svn.apache.org/viewvc?rev=1370891&view=rev
Log:
Merge -c 1370889 from trunk to branch-2.1.0-alpha to fix YARN-12. Fix findbugs warnings in FairScheduler. Contributed by Junping Du.
Modified:
hadoop/common/branches/branch-2.1.0-alpha/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2.1.0-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java
hadoop/common/branches/branch-2.1.0-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-yarn-project/CHANGES.txt?rev=1370891&r1=1370890&r2=1370891&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1.0-alpha/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-yarn-project/CHANGES.txt Wed Aug 8 18:55:15 2012
@@ -32,3 +32,5 @@ Release 2.1.0-alpha - Unreleased
BUG FIXES
+ YARN-12. Fix findbugs warnings in FairScheduler. (Junping Du via acmurthy)
+
Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java?rev=1370891&r1=1370890&r2=1370891&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1.0-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java (original)
+++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java Wed Aug 8 18:55:15 2012
@@ -129,15 +129,17 @@ class FairSchedulerEventLog {
/**
* Flush and close the log.
*/
- void shutdown() {
+ synchronized void shutdown() {
try {
if (appender != null)
appender.close();
- } catch (Exception e) {}
- logDisabled = true;
+ } catch (Exception e) {
+ LOG.error("Failed to close fair scheduler event log", e);
+ logDisabled = true;
+ }
}
- boolean isEnabled() {
+ synchronized boolean isEnabled() {
return !logDisabled;
}
}
Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1370891&r1=1370890&r2=1370891&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1.0-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Wed Aug 8 18:55:15 2012
@@ -116,7 +116,37 @@ public class QueueManager {
private long lastReloadAttempt; // Last time we tried to reload the queues file
private long lastSuccessfulReload; // Last time we successfully reloaded queues
private boolean lastReloadAttemptFailed = false;
-
+
+ // Monitor object for minQueueResources
+ private Object minQueueResourcesMO = new Object();
+
+ //Monitor object for maxQueueResources
+ private Object maxQueueResourcesMO = new Object();
+
+ //Monitor object for queueMaxApps
+ private Object queueMaxAppsMO = new Object();
+
+ //Monitor object for userMaxApps
+ private Object userMaxAppsMO = new Object();
+
+ //Monitor object for queueWeights
+ private Object queueWeightsMO = new Object();
+
+ //Monitor object for minSharePreemptionTimeouts
+ private Object minSharePreemptionTimeoutsMO = new Object();
+
+ //Monitor object for queueAcls
+ private Object queueAclsMO = new Object();
+
+ //Monitor object for userMaxAppsDefault
+ private Object userMaxAppsDefaultMO = new Object();
+
+ //Monitor object for queueMaxAppsDefault
+ private Object queueMaxAppsDefaultMO = new Object();
+
+ //Monitor object for defaultSchedulingMode
+ private Object defaultSchedulingModeMO = new Object();
+
public QueueManager(FairScheduler scheduler) {
this.scheduler = scheduler;
}
@@ -145,21 +175,27 @@ public class QueueManager {
/**
* Get a queue by name, creating it if necessary
*/
- public synchronized FSQueue getQueue(String name) {
- FSQueue queue = queues.get(name);
- if (queue == null) {
- queue = new FSQueue(scheduler, name);
- queue.setSchedulingMode(defaultSchedulingMode);
- queues.put(name, queue);
+ public FSQueue getQueue(String name) {
+ synchronized (queues) {
+ FSQueue queue = queues.get(name);
+ if (queue == null) {
+ queue = new FSQueue(scheduler, name);
+ synchronized (defaultSchedulingModeMO){
+ queue.setSchedulingMode(defaultSchedulingMode);
+ }
+ queues.put(name, queue);
+ }
+ return queue;
}
- return queue;
}
/**
* Return whether a queue exists already.
*/
- public synchronized boolean exists(String name) {
- return queues.containsKey(name);
+ public boolean exists(String name) {
+ synchronized (queues) {
+ return queues.containsKey(name);
+ }
}
/**
@@ -353,16 +389,16 @@ public class QueueManager {
// Commit the reload; also create any queue defined in the alloc file
// if it does not already exist, so it can be displayed on the web UI.
synchronized(this) {
- this.minQueueResources = minQueueResources;
- this.maxQueueResources = maxQueueResources;
- this.queueMaxApps = queueMaxApps;
- this.userMaxApps = userMaxApps;
- this.queueWeights = queueWeights;
- this.userMaxAppsDefault = userMaxAppsDefault;
- this.queueMaxAppsDefault = queueMaxAppsDefault;
- this.defaultSchedulingMode = defaultSchedulingMode;
- this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
- this.queueAcls = queueAcls;
+ setMinResources(minQueueResources);
+ setMaxResources(maxQueueResources);
+ setQueueMaxApps(queueMaxApps);
+ setUserMaxApps(userMaxApps);
+ setQueueWeights(queueWeights);
+ setUserMaxAppsDefault(userMaxAppsDefault);
+ setQueueMaxAppsDefault(queueMaxAppsDefault);
+ setDefaultSchedulingMode(defaultSchedulingMode);
+ setMinSharePreemptionTimeouts(minSharePreemptionTimeouts);
+ setQueueAcls(queueAcls);
for (String name: queueNamesInAllocFile) {
FSQueue queue = getQueue(name);
if (queueModes.containsKey(name)) {
@@ -392,25 +428,40 @@ public class QueueManager {
* @return the cap set on this queue, or 0 if not set.
*/
public Resource getMinResources(String queue) {
- if (minQueueResources.containsKey(queue)) {
- return minQueueResources.get(queue);
- } else{
- return Resources.createResource(0);
+ synchronized(minQueueResourcesMO) {
+ if (minQueueResources.containsKey(queue)) {
+ return minQueueResources.get(queue);
+ } else{
+ return Resources.createResource(0);
+ }
}
}
+ private void setMinResources(Map<String, Resource> resources) {
+ synchronized(minQueueResourcesMO) {
+ minQueueResources = resources;
+ }
+ }
/**
* Get the maximum resource allocation for the given queue.
* @return the cap set on this queue, or Integer.MAX_VALUE if not set.
*/
Resource getMaxResources(String queueName) {
- if (maxQueueResources.containsKey(queueName)) {
- return maxQueueResources.get(queueName);
- } else {
- return Resources.createResource(Integer.MAX_VALUE);
+ synchronized (maxQueueResourcesMO) {
+ if (maxQueueResources.containsKey(queueName)) {
+ return maxQueueResources.get(queueName);
+ } else {
+ return Resources.createResource(Integer.MAX_VALUE);
+ }
}
}
+ private void setMaxResources(Map<String, Resource> resources) {
+ synchronized(maxQueueResourcesMO) {
+ maxQueueResources = resources;
+ }
+ }
+
/**
* Add an app in the appropriate queue
*/
@@ -428,11 +479,12 @@ public class QueueManager {
/**
* Get a collection of all queues
*/
- public synchronized Collection<FSQueue> getQueues() {
- return queues.values();
+ public Collection<FSQueue> getQueues() {
+ synchronized (queues) {
+ return queues.values();
+ }
}
-
/**
* Get all queue names that have been seen either in the allocation file or in
* a submitted app.
@@ -447,40 +499,102 @@ public class QueueManager {
}
public int getUserMaxApps(String user) {
- if (userMaxApps.containsKey(user)) {
- return userMaxApps.get(user);
- } else {
- return userMaxAppsDefault;
+ synchronized (userMaxAppsMO) {
+ if (userMaxApps.containsKey(user)) {
+ return userMaxApps.get(user);
+ } else {
+ return getUserMaxAppsDefault();
+ }
}
}
+ private void setUserMaxApps(Map<String, Integer> userApps) {
+ synchronized (userMaxAppsMO) {
+ userMaxApps = userApps;
+ }
+ }
+
+ private int getUserMaxAppsDefault() {
+ synchronized (userMaxAppsDefaultMO){
+ return userMaxAppsDefault;
+ }
+ }
+
+ private void setUserMaxAppsDefault(int userMaxApps) {
+ synchronized (userMaxAppsDefaultMO){
+ userMaxAppsDefault = userMaxApps;
+ }
+ }
+
public int getQueueMaxApps(String queue) {
- if (queueMaxApps.containsKey(queue)) {
- return queueMaxApps.get(queue);
- } else {
+ synchronized (queueMaxAppsMO) {
+ if (queueMaxApps.containsKey(queue)) {
+ return queueMaxApps.get(queue);
+ } else {
+ return getQueueMaxAppsDefault();
+ }
+ }
+ }
+
+ private void setQueueMaxApps(Map<String, Integer> queueApps) {
+ synchronized (queueMaxAppsMO) {
+ queueMaxApps = queueApps;
+ }
+ }
+
+ private int getQueueMaxAppsDefault(){
+ synchronized(queueMaxAppsDefaultMO) {
return queueMaxAppsDefault;
}
}
+
+ private void setQueueMaxAppsDefault(int queueMaxApps){
+ synchronized(queueMaxAppsDefaultMO) {
+ queueMaxAppsDefault = queueMaxApps;
+ }
+ }
+
+ private void setDefaultSchedulingMode(SchedulingMode schedulingMode){
+ synchronized(defaultSchedulingModeMO) {
+ defaultSchedulingMode = schedulingMode;
+ }
+ }
public double getQueueWeight(String queue) {
- if (queueWeights.containsKey(queue)) {
- return queueWeights.get(queue);
- } else {
- return 1.0;
+ synchronized (queueWeightsMO) {
+ if (queueWeights.containsKey(queue)) {
+ return queueWeights.get(queue);
+ } else {
+ return 1.0;
+ }
}
}
+ private void setQueueWeights(Map<String, Double> weights) {
+ synchronized (queueWeightsMO) {
+ queueWeights = weights;
+ }
+ }
/**
* Get a queue's min share preemption timeout, in milliseconds. This is the
* time after which jobs in the queue may kill other queues' tasks if they
* are below their min share.
*/
public long getMinSharePreemptionTimeout(String queueName) {
- if (minSharePreemptionTimeouts.containsKey(queueName)) {
- return minSharePreemptionTimeouts.get(queueName);
+ synchronized (minSharePreemptionTimeoutsMO) {
+ if (minSharePreemptionTimeouts.containsKey(queueName)) {
+ return minSharePreemptionTimeouts.get(queueName);
+ }
}
return defaultMinSharePreemptionTimeout;
}
+
+ private void setMinSharePreemptionTimeouts(
+ Map<String, Long> sharePreemptionTimeouts){
+ synchronized (minSharePreemptionTimeoutsMO) {
+ minSharePreemptionTimeouts = sharePreemptionTimeouts;
+ }
+ }
/**
* Get the fair share preemption, in milliseconds. This is the time
@@ -497,9 +611,10 @@ public class QueueManager {
*/
public Map<QueueACL, AccessControlList> getQueueAcls(String queue) {
HashMap<QueueACL, AccessControlList> out = new HashMap<QueueACL, AccessControlList>();
-
- if (queueAcls.containsKey(queue)) {
- out.putAll(queueAcls.get(queue));
+ synchronized (queueAclsMO) {
+ if (queueAcls.containsKey(queue)) {
+ out.putAll(queueAcls.get(queue));
+ }
}
if (!out.containsKey(QueueACL.ADMINISTER_QUEUE)) {
out.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList("*"));
@@ -509,4 +624,10 @@ public class QueueManager {
}
return out;
}
+
+ private void setQueueAcls(Map<String, Map<QueueACL, AccessControlList>> queue) {
+ synchronized (queueAclsMO) {
+ queueAcls = queue;
+ }
+ }
}