You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ac...@apache.org on 2012/08/08 20:53:29 UTC

svn commit: r1370889 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/

Author: acmurthy
Date: Wed Aug  8 18:53:29 2012
New Revision: 1370889

URL: http://svn.apache.org/viewvc?rev=1370889&view=rev
Log:
YARN-12. Fix findbugs warnings in FairScheduler. Contributed by Junping Du.

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1370889&r1=1370888&r2=1370889&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Aug  8 18:53:29 2012
@@ -32,3 +32,5 @@ Release 2.1.0-alpha - Unreleased 
 
   BUG FIXES
 
+    YARN-12. Fix findbugs warnings in FairScheduler. (Junping Du via acmurthy) 
+

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java?rev=1370889&r1=1370888&r2=1370889&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java Wed Aug  8 18:53:29 2012
@@ -129,15 +129,17 @@ class FairSchedulerEventLog {
   /**
    * Flush and close the log.
    */
-  void shutdown() {
+  synchronized void shutdown() {
     try {
       if (appender != null)
         appender.close();
-    } catch (Exception e) {}
-    logDisabled = true;
+    } catch (Exception e) {
+      LOG.error("Failed to close fair scheduler event log", e);
+      logDisabled = true;
+    }
   }
 
-  boolean isEnabled() {
+  synchronized boolean isEnabled() {
     return !logDisabled;
   }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1370889&r1=1370888&r2=1370889&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Wed Aug  8 18:53:29 2012
@@ -116,7 +116,37 @@ public class QueueManager {
   private long lastReloadAttempt; // Last time we tried to reload the queues file
   private long lastSuccessfulReload; // Last time we successfully reloaded queues
   private boolean lastReloadAttemptFailed = false;
-
+  
+  // Monitor object for minQueueResources
+  private Object minQueueResourcesMO = new Object();
+  
+  //Monitor object for maxQueueResources
+  private Object maxQueueResourcesMO = new Object();
+  
+  //Monitor object for queueMaxApps
+  private Object queueMaxAppsMO = new Object();
+  
+  //Monitor object for userMaxApps
+  private Object userMaxAppsMO = new Object();
+  
+  //Monitor object for queueWeights
+  private Object queueWeightsMO = new Object();
+  
+  //Monitor object for minSharePreemptionTimeouts
+  private Object minSharePreemptionTimeoutsMO = new Object();
+  
+  //Monitor object for queueAcls
+  private Object queueAclsMO = new Object();
+  
+  //Monitor object for userMaxAppsDefault
+  private Object userMaxAppsDefaultMO = new Object();
+  
+  //Monitor object for queueMaxAppsDefault
+  private Object queueMaxAppsDefaultMO = new Object();
+  
+  //Monitor object for defaultSchedulingMode
+  private Object defaultSchedulingModeMO = new Object();
+  
   public QueueManager(FairScheduler scheduler) {
     this.scheduler = scheduler;
   }
@@ -145,21 +175,27 @@ public class QueueManager {
   /**
    * Get a queue by name, creating it if necessary
    */
-  public synchronized FSQueue getQueue(String name) {
-    FSQueue queue = queues.get(name);
-    if (queue == null) {
-      queue = new FSQueue(scheduler, name);
-      queue.setSchedulingMode(defaultSchedulingMode);
-      queues.put(name, queue);
+  public FSQueue getQueue(String name) {
+    synchronized (queues) {
+      FSQueue queue = queues.get(name);
+      if (queue == null) {
+        queue = new FSQueue(scheduler, name);
+        synchronized (defaultSchedulingModeMO){
+          queue.setSchedulingMode(defaultSchedulingMode);
+        }
+        queues.put(name, queue);
+      }
+      return queue;
     }
-    return queue;
   }
 
   /**
    * Return whether a queue exists already.
    */
-  public synchronized boolean exists(String name) {
-    return queues.containsKey(name);
+  public boolean exists(String name) {
+    synchronized (queues) {
+      return queues.containsKey(name);
+    }
   }
 
   /**
@@ -353,16 +389,16 @@ public class QueueManager {
     // Commit the reload; also create any queue defined in the alloc file
     // if it does not already exist, so it can be displayed on the web UI.
     synchronized(this) {
-      this.minQueueResources = minQueueResources;
-      this.maxQueueResources = maxQueueResources;
-      this.queueMaxApps = queueMaxApps;
-      this.userMaxApps = userMaxApps;
-      this.queueWeights = queueWeights;
-      this.userMaxAppsDefault = userMaxAppsDefault;
-      this.queueMaxAppsDefault = queueMaxAppsDefault;
-      this.defaultSchedulingMode = defaultSchedulingMode;
-      this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
-      this.queueAcls = queueAcls;
+      setMinResources(minQueueResources);
+      setMaxResources(maxQueueResources);
+      setQueueMaxApps(queueMaxApps);
+      setUserMaxApps(userMaxApps);
+      setQueueWeights(queueWeights);
+      setUserMaxAppsDefault(userMaxAppsDefault);
+      setQueueMaxAppsDefault(queueMaxAppsDefault);
+      setDefaultSchedulingMode(defaultSchedulingMode);
+      setMinSharePreemptionTimeouts(minSharePreemptionTimeouts);
+      setQueueAcls(queueAcls);
       for (String name: queueNamesInAllocFile) {
         FSQueue queue = getQueue(name);
         if (queueModes.containsKey(name)) {
@@ -392,25 +428,40 @@ public class QueueManager {
    * @return the cap set on this queue, or 0 if not set.
    */
   public Resource getMinResources(String queue) {
-    if (minQueueResources.containsKey(queue)) {
-      return minQueueResources.get(queue);
-    } else{
-      return Resources.createResource(0);
+    synchronized(minQueueResourcesMO) {
+      if (minQueueResources.containsKey(queue)) {
+        return minQueueResources.get(queue);
+      } else{
+        return Resources.createResource(0);
+      }
     }
   }
 
+  private void setMinResources(Map<String, Resource> resources) {
+    synchronized(minQueueResourcesMO) {
+      minQueueResources = resources;
+    }
+  }
   /**
    * Get the maximum resource allocation for the given queue.
    * @return the cap set on this queue, or Integer.MAX_VALUE if not set.
    */
   Resource getMaxResources(String queueName) {
-    if (maxQueueResources.containsKey(queueName)) {
-      return maxQueueResources.get(queueName);
-    } else {
-      return Resources.createResource(Integer.MAX_VALUE);
+    synchronized (maxQueueResourcesMO) {
+      if (maxQueueResources.containsKey(queueName)) {
+        return maxQueueResources.get(queueName);
+      } else {
+        return Resources.createResource(Integer.MAX_VALUE);
+      }
     }
   }
 
+  private void setMaxResources(Map<String, Resource> resources) {
+    synchronized(maxQueueResourcesMO) {
+      maxQueueResources = resources;
+    }
+  }
+  
   /**
    * Add an app in the appropriate queue
    */
@@ -428,11 +479,12 @@ public class QueueManager {
   /**
    * Get a collection of all queues
    */
-  public synchronized Collection<FSQueue> getQueues() {
-    return queues.values();
+  public Collection<FSQueue> getQueues() {
+    synchronized (queues) {
+      return queues.values();
+    }
   }
 
-
   /**
    * Get all queue names that have been seen either in the allocation file or in
    * a submitted app.
@@ -447,40 +499,102 @@ public class QueueManager {
   }
 
   public int getUserMaxApps(String user) {
-    if (userMaxApps.containsKey(user)) {
-      return userMaxApps.get(user);
-    } else {
-      return userMaxAppsDefault;
+    synchronized (userMaxAppsMO) {
+      if (userMaxApps.containsKey(user)) {
+        return userMaxApps.get(user);
+      } else {
+        return getUserMaxAppsDefault();
+      }
     }
   }
 
+  private void setUserMaxApps(Map<String, Integer> userApps) {
+    synchronized (userMaxAppsMO) {
+      userMaxApps = userApps;
+    }
+  }
+  
+  private int getUserMaxAppsDefault() {
+    synchronized (userMaxAppsDefaultMO){
+      return userMaxAppsDefault;
+    }
+  }
+  
+  private void setUserMaxAppsDefault(int userMaxApps) {
+    synchronized (userMaxAppsDefaultMO){
+      userMaxAppsDefault = userMaxApps;
+    }
+  }
+  
   public int getQueueMaxApps(String queue) {
-    if (queueMaxApps.containsKey(queue)) {
-      return queueMaxApps.get(queue);
-    } else {
+    synchronized (queueMaxAppsMO) {
+      if (queueMaxApps.containsKey(queue)) {
+        return queueMaxApps.get(queue);
+      } else {
+        return getQueueMaxAppsDefault();
+      }
+    }
+  }
+  
+  private void setQueueMaxApps(Map<String, Integer> queueApps) {
+    synchronized (queueMaxAppsMO) {
+      queueMaxApps = queueApps;
+    }
+  }
+  
+  private int getQueueMaxAppsDefault(){
+    synchronized(queueMaxAppsDefaultMO) {
       return queueMaxAppsDefault;
     }
   }
+  
+  private void setQueueMaxAppsDefault(int queueMaxApps){
+    synchronized(queueMaxAppsDefaultMO) {
+      queueMaxAppsDefault = queueMaxApps;
+    }
+  }
+  
+  private void setDefaultSchedulingMode(SchedulingMode schedulingMode){
+    synchronized(defaultSchedulingModeMO) {
+      defaultSchedulingMode = schedulingMode;
+    }
+  }
 
   public double getQueueWeight(String queue) {
-    if (queueWeights.containsKey(queue)) {
-      return queueWeights.get(queue);
-    } else {
-      return 1.0;
+    synchronized (queueWeightsMO) {
+      if (queueWeights.containsKey(queue)) {
+        return queueWeights.get(queue);
+      } else {
+        return 1.0;
+      }
     }
   }
 
+  private void setQueueWeights(Map<String, Double> weights) {
+    synchronized (queueWeightsMO) {
+      queueWeights = weights;
+    }
+  }
   /**
   * Get a queue's min share preemption timeout, in milliseconds. This is the
   * time after which jobs in the queue may kill other queues' tasks if they
   * are below their min share.
   */
   public long getMinSharePreemptionTimeout(String queueName) {
-    if (minSharePreemptionTimeouts.containsKey(queueName)) {
-      return minSharePreemptionTimeouts.get(queueName);
+    synchronized (minSharePreemptionTimeoutsMO) {
+      if (minSharePreemptionTimeouts.containsKey(queueName)) {
+        return minSharePreemptionTimeouts.get(queueName);
+      }
     }
     return defaultMinSharePreemptionTimeout;
   }
+  
+  private void setMinSharePreemptionTimeouts(
+      Map<String, Long> sharePreemptionTimeouts){
+    synchronized (minSharePreemptionTimeoutsMO) {
+      minSharePreemptionTimeouts = sharePreemptionTimeouts;
+    }
+  }
 
   /**
    * Get the fair share preemption, in milliseconds. This is the time
@@ -497,9 +611,10 @@ public class QueueManager {
    */
   public Map<QueueACL, AccessControlList> getQueueAcls(String queue) {
     HashMap<QueueACL, AccessControlList> out = new HashMap<QueueACL, AccessControlList>();
-
-    if (queueAcls.containsKey(queue)) {
-      out.putAll(queueAcls.get(queue));
+    synchronized (queueAclsMO) {
+      if (queueAcls.containsKey(queue)) {
+        out.putAll(queueAcls.get(queue));
+      }
     }
     if (!out.containsKey(QueueACL.ADMINISTER_QUEUE)) {
       out.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList("*"));
@@ -509,4 +624,10 @@ public class QueueManager {
     }
     return out;
   }
+  
+  private void setQueueAcls(Map<String, Map<QueueACL, AccessControlList>> queue) {
+    synchronized (queueAclsMO) {
+      queueAcls = queue;
+    }
+  }
 }