You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sa...@apache.org on 2013/12/05 04:30:18 UTC

svn commit: r1548007 [1/2] - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ hadoop-yarn/hadoop-ya...

Author: sandy
Date: Thu Dec  5 03:30:17 2013
New Revision: 1548007

URL: http://svn.apache.org/r1548007
Log:
YARN-1403. Separate out configuration loading from QueueManager in the Fair Scheduler (Sandy Ryza)

Added:
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
      - copied unchanged from r1548006, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
      - copied unchanged from r1548006, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
      - copied unchanged from r1548006, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1548007&r1=1548006&r2=1548007&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Thu Dec  5 03:30:17 2013
@@ -117,6 +117,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1332. In TestAMRMClient, replace assertTrue with assertEquals where
     possible (Sebastian Wong via Sandy Ryza)
 
+    YARN-1403. Separate out configuration loading from QueueManager in the Fair
+    Scheduler (Sandy Ryza)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1548007&r1=1548006&r2=1548007&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Thu Dec  5 03:30:17 2013
@@ -46,19 +46,15 @@ public class FSLeafQueue extends FSQueue
   private final List<AppSchedulable> nonRunnableAppScheds =
       new ArrayList<AppSchedulable>();
   
-  private final FairScheduler scheduler;
-  private final QueueManager queueMgr;
   private Resource demand = Resources.createResource(0);
   
   // Variables used for preemption
   private long lastTimeAtMinShare;
   private long lastTimeAtHalfFairShare;
   
-  public FSLeafQueue(String name, QueueManager queueMgr, FairScheduler scheduler,
+  public FSLeafQueue(String name, FairScheduler scheduler,
       FSParentQueue parent) {
-    super(name, queueMgr, scheduler, parent);
-    this.scheduler = scheduler;
-    this.queueMgr = queueMgr;
+    super(name, scheduler, parent);
     this.lastTimeAtMinShare = scheduler.getClock().getTime();
     this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
   }
@@ -145,7 +141,8 @@ public class FSLeafQueue extends FSQueue
   public void updateDemand() {
     // Compute demand by iterating through apps in the queue
     // Limit demand to maxResources
-    Resource maxRes = queueMgr.getMaxResources(getName());
+    Resource maxRes = scheduler.getAllocationConfiguration()
+        .getMaxResources(getName());
     demand = Resources.createResource(0);
     for (AppSchedulable sched : runnableAppScheds) {
       if (Resources.equals(demand, maxRes)) {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1548007&r1=1548006&r2=1548007&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Thu Dec  5 03:30:17 2013
@@ -41,14 +41,12 @@ public class FSParentQueue extends FSQue
 
   private final List<FSQueue> childQueues = 
       new ArrayList<FSQueue>();
-  private final QueueManager queueMgr;
   private Resource demand = Resources.createResource(0);
   private int runnableApps;
   
-  public FSParentQueue(String name, QueueManager queueMgr, FairScheduler scheduler,
+  public FSParentQueue(String name, FairScheduler scheduler,
       FSParentQueue parent) {
-    super(name, queueMgr, scheduler, parent);
-    this.queueMgr = queueMgr;
+    super(name, scheduler, parent);
   }
   
   public void addChildQueue(FSQueue child) {
@@ -82,7 +80,8 @@ public class FSParentQueue extends FSQue
   public void updateDemand() {
     // Compute demand by iterating through apps in the queue
     // Limit demand to maxResources
-    Resource maxRes = queueMgr.getMaxResources(getName());
+    Resource maxRes = scheduler.getAllocationConfiguration()
+        .getMaxResources(getName());
     demand = Resources.createResource(0);
     for (FSQueue childQueue : childQueues) {
       childQueue.updateDemand();
@@ -164,8 +163,8 @@ public class FSParentQueue extends FSQue
   public void setPolicy(SchedulingPolicy policy)
       throws AllocationConfigurationException {
     boolean allowed =
-        SchedulingPolicy.isApplicableTo(policy, (this == queueMgr
-            .getRootQueue()) ? SchedulingPolicy.DEPTH_ROOT
+        SchedulingPolicy.isApplicableTo(policy, (parent == null)
+            ? SchedulingPolicy.DEPTH_ROOT
             : SchedulingPolicy.DEPTH_INTERMEDIATE);
     if (!allowed) {
       throwPolicyDoesnotApplyException(policy);

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1548007&r1=1548006&r2=1548007&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Thu Dec  5 03:30:17 2013
@@ -39,20 +39,17 @@ import org.apache.hadoop.yarn.util.resou
 @Unstable
 public abstract class FSQueue extends Schedulable implements Queue {
   private final String name;
-  private final QueueManager queueMgr;
-  private final FairScheduler scheduler;
+  protected final FairScheduler scheduler;
   private final FSQueueMetrics metrics;
   
   protected final FSParentQueue parent;
   protected final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
   
-  protected SchedulingPolicy policy = SchedulingPolicy.getDefault();
+  protected SchedulingPolicy policy = SchedulingPolicy.DEFAULT_POLICY;
 
-  public FSQueue(String name, QueueManager queueMgr, 
-      FairScheduler scheduler, FSParentQueue parent) {
+  public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
     this.name = name;
-    this.queueMgr = queueMgr;
     this.scheduler = scheduler;
     this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
     metrics.setMinShare(getMinShare());
@@ -88,17 +85,17 @@ public abstract class FSQueue extends Sc
 
   @Override
   public ResourceWeights getWeights() {
-    return queueMgr.getQueueWeight(getName());
+    return scheduler.getAllocationConfiguration().getQueueWeight(getName());
   }
   
   @Override
   public Resource getMinShare() {
-    return queueMgr.getMinResources(getName());
+    return scheduler.getAllocationConfiguration().getMinResources(getName());
   }
   
   @Override
   public Resource getMaxShare() {
-    return queueMgr.getMaxResources(getName());
+    return scheduler.getAllocationConfiguration().getMaxResources(getName());
   }
 
   @Override
@@ -148,13 +145,7 @@ public abstract class FSQueue extends Sc
   }
   
   public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
-    // Check if the leaf-queue allows access
-    if (queueMgr.getQueueAcl(getName(), acl).isUserAllowed(user)) {
-      return true;
-    }
-
-    // Check if parent-queue allows access
-    return parent != null && parent.hasAccess(acl, user);
+    return scheduler.getAllocationConfiguration().hasAccess(name, acl, user);
   }
   
   /**
@@ -181,7 +172,7 @@ public abstract class FSQueue extends Sc
    */
   protected boolean assignContainerPreCheck(FSSchedulerNode node) {
     if (!Resources.fitsIn(getResourceUsage(),
-        queueMgr.getMaxResources(getName()))
+        scheduler.getAllocationConfiguration().getMaxResources(getName()))
         || node.getReservedContainer() != null) {
       return false;
     }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1548007&r1=1548006&r2=1548007&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu Dec  5 03:30:17 2013
@@ -192,11 +192,16 @@ public class FairScheduler implements Re
 
   @VisibleForTesting
   final MaxRunningAppsEnforcer maxRunningEnforcer;
+
+  private AllocationFileLoaderService allocsLoader;
+  @VisibleForTesting
+  AllocationConfiguration allocConf;
   
   public FairScheduler() {
     clock = new SystemClock();
+    allocsLoader = new AllocationFileLoaderService();
     queueMgr = new QueueManager(this);
-    maxRunningEnforcer = new MaxRunningAppsEnforcer(queueMgr);
+    maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
   }
 
   private void validateConf(Configuration conf) {
@@ -275,7 +280,6 @@ public class FairScheduler implements Re
    * required resources per job.
    */
   protected synchronized void update() {
-    queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
     updatePreemptionVariables(); // Determine if any queues merit preemption
 
     FSQueue rootQueue = queueMgr.getRootQueue();
@@ -480,8 +484,8 @@ public class FairScheduler implements Re
    */
   protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
     String queue = sched.getName();
-    long minShareTimeout = queueMgr.getMinSharePreemptionTimeout(queue);
-    long fairShareTimeout = queueMgr.getFairSharePreemptionTimeout();
+    long minShareTimeout = allocConf.getMinSharePreemptionTimeout(queue);
+    long fairShareTimeout = allocConf.getFairSharePreemptionTimeout();
     Resource resDueToMinShare = Resources.none();
     Resource resDueToFairShare = Resources.none();
     if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
@@ -650,8 +654,8 @@ public class FairScheduler implements Re
   FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
     FSLeafQueue queue = null;
     try {
-      QueuePlacementPolicy policy = queueMgr.getPlacementPolicy();
-      queueName = policy.assignAppToQueue(queueName, user);
+      QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy();
+      queueName = placementPolicy.assignAppToQueue(queueName, user);
       if (queueName == null) {
         return null;
       }
@@ -1128,27 +1132,27 @@ public class FairScheduler implements Re
   @Override
   public synchronized void reinitialize(Configuration conf, RMContext rmContext)
       throws IOException {
-    this.conf = new FairSchedulerConfiguration(conf);
-    validateConf(this.conf);
-    minimumAllocation = this.conf.getMinimumAllocation();
-    maximumAllocation = this.conf.getMaximumAllocation();
-    incrAllocation = this.conf.getIncrementAllocation();
-    continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
-    continuousSchedulingSleepMs =
-            this.conf.getContinuousSchedulingSleepMs();
-    nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
-    rackLocalityThreshold = this.conf.getLocalityThresholdRack();
-    nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
-    rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
-    preemptionEnabled = this.conf.getPreemptionEnabled();
-    assignMultiple = this.conf.getAssignMultiple();
-    maxAssign = this.conf.getMaxAssign();
-    sizeBasedWeight = this.conf.getSizeBasedWeight();
-    preemptionInterval = this.conf.getPreemptionInterval();
-    waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
-    usePortForNodeName = this.conf.getUsePortForNodeName();
-    
     if (!initialized) {
+      this.conf = new FairSchedulerConfiguration(conf);
+      validateConf(this.conf);
+      minimumAllocation = this.conf.getMinimumAllocation();
+      maximumAllocation = this.conf.getMaximumAllocation();
+      incrAllocation = this.conf.getIncrementAllocation();
+      continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
+      continuousSchedulingSleepMs =
+              this.conf.getContinuousSchedulingSleepMs();
+      nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
+      rackLocalityThreshold = this.conf.getLocalityThresholdRack();
+      nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
+      rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
+      preemptionEnabled = this.conf.getPreemptionEnabled();
+      assignMultiple = this.conf.getAssignMultiple();
+      maxAssign = this.conf.getMaxAssign();
+      sizeBasedWeight = this.conf.getSizeBasedWeight();
+      preemptionInterval = this.conf.getPreemptionInterval();
+      waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
+      usePortForNodeName = this.conf.getUsePortForNodeName();
+      
       rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
       this.rmContext = rmContext;
       this.eventLog = new FairSchedulerEventLog();
@@ -1156,8 +1160,9 @@ public class FairScheduler implements Re
 
       initialized = true;
 
+      allocConf = new AllocationConfiguration(conf);
       try {
-        queueMgr.initialize();
+        queueMgr.initialize(conf);
       } catch (Exception e) {
         throw new IOException("Failed to start FairScheduler", e);
       }
@@ -1181,12 +1186,24 @@ public class FairScheduler implements Re
         schedulingThread.setDaemon(true);
         schedulingThread.start();
       }
-    } else {
+      
+      allocsLoader.init(conf);
+      allocsLoader.setReloadListener(new AllocationReloadListener());
+      // If we fail to load allocations file on initialize, we want to fail
+      // immediately.  After a successful load, exceptions on future reloads
+      // will just result in leaving things as they are.
       try {
-        queueMgr.reloadAllocs();
+        allocsLoader.reloadAllocations();
       } catch (Exception e) {
         throw new IOException("Failed to initialize FairScheduler", e);
       }
+      allocsLoader.start();
+    } else {
+      try {
+        allocsLoader.reloadAllocations();
+      } catch (Exception e) {
+        LOG.error("Failed to reload allocations file", e);
+      }
     }
   }
 
@@ -1230,5 +1247,24 @@ public class FairScheduler implements Re
     }
     return queue.hasAccess(acl, callerUGI);
   }
+  
+  public AllocationConfiguration getAllocationConfiguration() {
+    return allocConf;
+  }
+  
+  private class AllocationReloadListener implements
+      AllocationFileLoaderService.Listener {
+
+    @Override
+    public void onReload(AllocationConfiguration queueInfo) {
+      // Commit the reload; also create any queue defined in the alloc file
+      // if it does not already exist, so it can be displayed on the web UI.
+      synchronized (FairScheduler.this) {
+        allocConf = queueInfo;
+        allocConf.getDefaultSchedulingPolicy().initialize(clusterCapacity);
+        queueMgr.updateAllocationConfiguration(allocConf);
+      }
+    }
+  }
 
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1548007&r1=1548006&r2=1548007&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Thu Dec  5 03:30:17 2013
@@ -18,7 +18,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import java.io.File;
-import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -151,14 +152,6 @@ public class FairSchedulerConfiguration 
     return Resources.createResource(incrementMemory, incrementCores);
   }
   
-  public boolean getAllowUndeclaredPools() {
-    return getBoolean(ALLOW_UNDECLARED_POOLS, DEFAULT_ALLOW_UNDECLARED_POOLS);
-  }
-
-  public boolean getUserAsDefaultQueue() {
-    return getBoolean(USER_AS_DEFAULT_QUEUE, DEFAULT_USER_AS_DEFAULT_QUEUE);
-  }
-
   public float getLocalityThresholdNode() {
     return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE);
   }
@@ -199,30 +192,6 @@ public class FairSchedulerConfiguration 
     return getBoolean(SIZE_BASED_WEIGHT, DEFAULT_SIZE_BASED_WEIGHT);
   }
 
-  /**
-   * Path to XML file containing allocations. If the
-   * path is relative, it is searched for in the
-   * classpath, but loaded like a regular File.
-   */
-  public File getAllocationFile() {
-    String allocFilePath = get(ALLOCATION_FILE, DEFAULT_ALLOCATION_FILE);
-    File allocFile = new File(allocFilePath);
-    if (!allocFile.isAbsolute()) {
-      URL url = Thread.currentThread().getContextClassLoader()
-          .getResource(allocFilePath);
-      if (url == null) {
-        LOG.warn(allocFilePath + " not found on the classpath.");
-        allocFile = null;
-      } else if (!url.getProtocol().equalsIgnoreCase("file")) {
-        throw new RuntimeException("Allocation file " + url
-            + " found on the classpath is not on the local filesystem.");
-      } else {
-        allocFile = new File(url.getPath());
-      }
-    }
-    return allocFile;
-  }
-
   public String getEventlogDir() {
     return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir",
     		"/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java?rev=1548007&r1=1548006&r2=1548007&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java Thu Dec  5 03:30:17 2013
@@ -33,15 +33,15 @@ import com.google.common.collect.ListMul
  * constraints
  */
 public class MaxRunningAppsEnforcer {
-  private final QueueManager queueMgr;
+  private final FairScheduler scheduler;
 
   // Tracks the number of running applications by user.
   private final Map<String, Integer> usersNumRunnableApps;
   @VisibleForTesting
   final ListMultimap<String, AppSchedulable> usersNonRunnableApps;
 
-  public MaxRunningAppsEnforcer(QueueManager queueMgr) {
-    this.queueMgr = queueMgr;
+  public MaxRunningAppsEnforcer(FairScheduler scheduler) {
+    this.scheduler = scheduler;
     this.usersNumRunnableApps = new HashMap<String, Integer>();
     this.usersNonRunnableApps = ArrayListMultimap.create();
   }
@@ -51,16 +51,17 @@ public class MaxRunningAppsEnforcer {
    * maxRunningApps limits.
    */
   public boolean canAppBeRunnable(FSQueue queue, String user) {
+    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
     Integer userNumRunnable = usersNumRunnableApps.get(user);
     if (userNumRunnable == null) {
       userNumRunnable = 0;
     }
-    if (userNumRunnable >= queueMgr.getUserMaxApps(user)) {
+    if (userNumRunnable >= allocConf.getUserMaxApps(user)) {
       return false;
     }
     // Check queue and all parent queues
     while (queue != null) {
-      int queueMaxApps = queueMgr.getQueueMaxApps(queue.getName());
+      int queueMaxApps = allocConf.getQueueMaxApps(queue.getName());
       if (queue.getNumRunnableApps() >= queueMaxApps) {
         return false;
       }
@@ -107,6 +108,8 @@ public class MaxRunningAppsEnforcer {
    * highest queue that went from having no slack to having slack.
    */
   public void updateRunnabilityOnAppRemoval(FSSchedulerApp app) {
+    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+    
     // Update usersRunnableApps
     String user = app.getUser();
     int newUserNumRunning = usersNumRunnableApps.get(user) - 1;
@@ -127,10 +130,10 @@ public class MaxRunningAppsEnforcer {
     // that was at its maxRunningApps before the removal.
     FSLeafQueue queue = app.getQueue();
     FSQueue highestQueueWithAppsNowRunnable = (queue.getNumRunnableApps() ==
-        queueMgr.getQueueMaxApps(queue.getName()) - 1) ? queue : null;
+        allocConf.getQueueMaxApps(queue.getName()) - 1) ? queue : null;
     FSParentQueue parent = queue.getParent();
     while (parent != null) {
-      if (parent.getNumRunnableApps() == queueMgr.getQueueMaxApps(parent
+      if (parent.getNumRunnableApps() == allocConf.getQueueMaxApps(parent
           .getName())) {
         highestQueueWithAppsNowRunnable = parent;
       }
@@ -149,7 +152,7 @@ public class MaxRunningAppsEnforcer {
       gatherPossiblyRunnableAppLists(highestQueueWithAppsNowRunnable,
           appsNowMaybeRunnable);
     }
-    if (newUserNumRunning == queueMgr.getUserMaxApps(user) - 1) {
+    if (newUserNumRunning == allocConf.getUserMaxApps(user) - 1) {
       List<AppSchedulable> userWaitingApps = usersNonRunnableApps.get(user);
       if (userWaitingApps != null) {
         appsNowMaybeRunnable.add(userWaitingApps);
@@ -200,7 +203,8 @@ public class MaxRunningAppsEnforcer {
    */
   private void gatherPossiblyRunnableAppLists(FSQueue queue,
       List<List<AppSchedulable>> appLists) {
-    if (queue.getNumRunnableApps() < queueMgr.getQueueMaxApps(queue.getName())) {
+    if (queue.getNumRunnableApps() < scheduler.getAllocationConfiguration()
+        .getQueueMaxApps(queue.getName())) {
       if (queue instanceof FSLeafQueue) {
         appLists.add(((FSLeafQueue)queue).getNonRunnableAppSchedulables());
       } else {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1548007&r1=1548006&r2=1548007&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Thu Dec  5 03:30:17 2013
@@ -18,20 +18,14 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import java.io.File;
 import java.io.IOException;
-import java.net.URL;
-import java.net.URLConnection;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
 
 import org.apache.commons.logging.Log;
@@ -39,21 +33,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.yarn.api.records.QueueACL;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-import org.w3c.dom.Text;
 import org.xml.sax.SAXException;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * Maintains a list of queues as well as scheduling parameters for each queue,
  * such as guaranteed share allocations, from the fair scheduler config file.
@@ -67,37 +49,13 @@ public class QueueManager {
 
   public static final String ROOT_QUEUE = "root";
   
-  /** Time to wait between checks of the allocation file */
-  public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000;
-
-  /**
-   * Time to wait after the allocation has been modified before reloading it
-   * (this is done to prevent loading a file that hasn't been fully written).
-   */
-  public static final long ALLOC_RELOAD_WAIT = 5 * 1000;
-  
-  private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*");
-  private static final AccessControlList NOBODY_ACL = new AccessControlList(" ");
-
   private final FairScheduler scheduler;
 
-  // Path to XML file containing allocations. 
-  private File allocFile; 
-
   private final Collection<FSLeafQueue> leafQueues = 
       new CopyOnWriteArrayList<FSLeafQueue>();
   private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
   private FSParentQueue rootQueue;
 
-  @VisibleForTesting
-  volatile QueueManagerInfo info = new QueueManagerInfo();
-  @VisibleForTesting
-  volatile QueuePlacementPolicy placementPolicy;
-  
-  private long lastReloadAttempt; // Last time we tried to reload the queues file
-  private long lastSuccessfulReload; // Last time we successfully reloaded queues
-  private boolean lastReloadAttemptFailed = false;
-  
   public QueueManager(FairScheduler scheduler) {
     this.scheduler = scheduler;
   }
@@ -106,45 +64,15 @@ public class QueueManager {
     return rootQueue;
   }
 
-  public void initialize() throws IOException, SAXException,
-      AllocationConfigurationException, ParserConfigurationException {
-    FairSchedulerConfiguration conf = scheduler.getConf();
-    rootQueue = new FSParentQueue("root", this, scheduler, null);
+  public void initialize(Configuration conf) throws IOException,
+      SAXException, AllocationConfigurationException, ParserConfigurationException {
+    rootQueue = new FSParentQueue("root", scheduler, null);
     queues.put(rootQueue.getName(), rootQueue);
     
-    this.allocFile = conf.getAllocationFile();
-    placementPolicy = new QueuePlacementPolicy(getSimplePlacementRules(),
-        new HashSet<String>(), conf);
-    
-    reloadAllocs();
-    lastSuccessfulReload = scheduler.getClock().getTime();
-    lastReloadAttempt = scheduler.getClock().getTime();
     // Create the default queue
     getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
   }
   
-  public void updatePlacementPolicy(FairSchedulerConfiguration conf) {
-    
-  }
-  
-  /**
-   * Construct simple queue placement policy from allow-undeclared-pools and
-   * user-as-default-queue.
-   */
-  private List<QueuePlacementRule> getSimplePlacementRules() {
-    boolean create = scheduler.getConf().getAllowUndeclaredPools();
-    boolean userAsDefaultQueue = scheduler.getConf().getUserAsDefaultQueue();
-    List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
-    rules.add(new QueuePlacementRule.Specified().initialize(create, null));
-    if (userAsDefaultQueue) {
-      rules.add(new QueuePlacementRule.User().initialize(create, null));
-    }
-    if (!userAsDefaultQueue || !create) {
-      rules.add(new QueuePlacementRule.Default().initialize(true, null));
-    }
-    return rules;
-  }
-  
   /**
    * Get a queue by name, creating it if the create param is true and is necessary.
    * If the queue is not or can not be a leaf queue, i.e. it already exists as a
@@ -213,17 +141,30 @@ public class QueueManager {
     // queue to create.
     // Now that we know everything worked out, make all the queues
     // and add them to the map.
+    AllocationConfiguration queueConf = scheduler.getAllocationConfiguration();
     FSLeafQueue leafQueue = null;
     for (int i = newQueueNames.size()-1; i >= 0; i--) {
       String queueName = newQueueNames.get(i);
       if (i == 0) {
         // First name added was the leaf queue
-        leafQueue = new FSLeafQueue(name, this, scheduler, parent);
+        leafQueue = new FSLeafQueue(name, scheduler, parent);
+        try {
+          leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy());
+        } catch (AllocationConfigurationException ex) {
+          LOG.warn("Failed to set default scheduling policy "
+              + queueConf.getDefaultSchedulingPolicy() + " on new leaf queue.", ex);
+        }
         parent.addChildQueue(leafQueue);
         queues.put(leafQueue.getName(), leafQueue);
         leafQueues.add(leafQueue);
       } else {
-        FSParentQueue newParent = new FSParentQueue(queueName, this, scheduler, parent);
+        FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent);
+        try {
+          newParent.setPolicy(queueConf.getDefaultSchedulingPolicy());
+        } catch (AllocationConfigurationException ex) {
+          LOG.warn("Failed to set default scheduling policy "
+              + queueConf.getDefaultSchedulingPolicy() + " on new parent queue.", ex);
+        }
         parent.addChildQueue(newParent);
         queues.put(newParent.getName(), newParent);
         parent = newParent;
@@ -257,301 +198,6 @@ public class QueueManager {
     }
   }
   
-  public QueuePlacementPolicy getPlacementPolicy() {
-    return placementPolicy;
-  }
-
-  /**
-   * Reload allocations file if it hasn't been loaded in a while
-   */
-  public void reloadAllocsIfNecessary() {
-    long time = scheduler.getClock().getTime();
-    if (time > lastReloadAttempt + ALLOC_RELOAD_INTERVAL) {
-      lastReloadAttempt = time;
-      if (null == allocFile) {
-        return;
-      }
-      try {
-        // Get last modified time of alloc file depending whether it's a String
-        // (for a path name) or an URL (for a classloader resource)
-        long lastModified = allocFile.lastModified();
-        if (lastModified > lastSuccessfulReload &&
-            time > lastModified + ALLOC_RELOAD_WAIT) {
-          reloadAllocs();
-          lastSuccessfulReload = time;
-          lastReloadAttemptFailed = false;
-        }
-      } catch (Exception e) {
-        // Throwing the error further out here won't help - the RPC thread
-        // will catch it and report it in a loop. Instead, just log it and
-        // hope somebody will notice from the log.
-        // We log the error only on the first failure so we don't fill up the
-        // JobTracker's log with these messages.
-        if (!lastReloadAttemptFailed) {
-          LOG.error("Failed to reload fair scheduler config file - " +
-              "will use existing allocations.", e);
-        }
-        lastReloadAttemptFailed = true;
-      }
-    }
-  }
-
-  /**
-   * Updates the allocation list from the allocation config file. This file is
-   * expected to be in the XML format specified in the design doc.
-   *
-   * @throws IOException if the config file cannot be read.
-   * @throws AllocationConfigurationException if allocations are invalid.
-   * @throws ParserConfigurationException if XML parser is misconfigured.
-   * @throws SAXException if config file is malformed.
-   */
-  public void reloadAllocs() throws IOException, ParserConfigurationException,
-      SAXException, AllocationConfigurationException {
-    if (allocFile == null) return;
-    // Create some temporary hashmaps to hold the new allocs, and we only save
-    // them in our fields if we have parsed the entire allocs file successfully.
-    Map<String, Resource> minQueueResources = new HashMap<String, Resource>();
-    Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
-    Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
-    Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
-    Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
-    Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
-    Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
-    Map<String, Map<QueueACL, AccessControlList>> queueAcls =
-        new HashMap<String, Map<QueueACL, AccessControlList>>();
-    int userMaxAppsDefault = Integer.MAX_VALUE;
-    int queueMaxAppsDefault = Integer.MAX_VALUE;
-    long fairSharePreemptionTimeout = Long.MAX_VALUE;
-    long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
-    SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.getDefault();
-    
-    QueuePlacementPolicy newPlacementPolicy = null;
-
-    // Remember all queue names so we can display them on web UI, etc.
-    List<String> queueNamesInAllocFile = new ArrayList<String>();
-
-    // Read and parse the allocations file.
-    DocumentBuilderFactory docBuilderFactory =
-      DocumentBuilderFactory.newInstance();
-    docBuilderFactory.setIgnoringComments(true);
-    DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
-    Document doc = builder.parse(allocFile);
-    Element root = doc.getDocumentElement();
-    if (!"allocations".equals(root.getTagName()))
-      throw new AllocationConfigurationException("Bad fair scheduler config " +
-          "file: top-level element not <allocations>");
-    NodeList elements = root.getChildNodes();
-    List<Element> queueElements = new ArrayList<Element>();
-    Element placementPolicyElement = null;
-    for (int i = 0; i < elements.getLength(); i++) {
-      Node node = elements.item(i);
-      if (node instanceof Element) {
-        Element element = (Element)node;
-        if ("queue".equals(element.getTagName()) ||
-      	  "pool".equals(element.getTagName())) {
-          queueElements.add(element);
-        } else if ("user".equals(element.getTagName())) {
-          String userName = element.getAttribute("name");
-          NodeList fields = element.getChildNodes();
-          for (int j = 0; j < fields.getLength(); j++) {
-            Node fieldNode = fields.item(j);
-            if (!(fieldNode instanceof Element))
-              continue;
-            Element field = (Element) fieldNode;
-            if ("maxRunningApps".equals(field.getTagName())) {
-              String text = ((Text)field.getFirstChild()).getData().trim();
-              int val = Integer.parseInt(text);
-              userMaxApps.put(userName, val);
-            }
-          }
-        } else if ("userMaxAppsDefault".equals(element.getTagName())) {
-          String text = ((Text)element.getFirstChild()).getData().trim();
-          int val = Integer.parseInt(text);
-          userMaxAppsDefault = val;
-        } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
-          String text = ((Text)element.getFirstChild()).getData().trim();
-          long val = Long.parseLong(text) * 1000L;
-          fairSharePreemptionTimeout = val;
-        } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
-          String text = ((Text)element.getFirstChild()).getData().trim();
-          long val = Long.parseLong(text) * 1000L;
-          defaultMinSharePreemptionTimeout = val;
-        } else if ("queueMaxAppsDefault".equals(element.getTagName())) {
-          String text = ((Text)element.getFirstChild()).getData().trim();
-          int val = Integer.parseInt(text);
-          queueMaxAppsDefault = val;
-        } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
-            || "defaultQueueSchedulingMode".equals(element.getTagName())) {
-          String text = ((Text)element.getFirstChild()).getData().trim();
-          SchedulingPolicy.setDefault(text);
-          defaultSchedPolicy = SchedulingPolicy.getDefault();
-        } else if ("queuePlacementPolicy".equals(element.getTagName())) {
-          placementPolicyElement = element;
-        } else {
-          LOG.warn("Bad element in allocations file: " + element.getTagName());
-        }
-      }
-    }
-    
-    // Load queue elements.  A root queue can either be included or omitted.  If
-    // it's included, all other queues must be inside it.
-    for (Element element : queueElements) {
-      String parent = "root";
-      if (element.getAttribute("name").equalsIgnoreCase("root")) {
-        if (queueElements.size() > 1) {
-          throw new AllocationConfigurationException("If configuring root queue,"
-          		+ " no other queues can be placed alongside it.");
-        }
-        parent = null;
-      }
-      loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps,
-          userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
-          queueAcls, queueNamesInAllocFile);
-    }
-    
-    // Load placement policy and pass it configured queues
-    if (placementPolicyElement != null) {
-      newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement,
-          new HashSet<String>(queueNamesInAllocFile), scheduler.getConf());
-    } else {
-      newPlacementPolicy = new QueuePlacementPolicy(getSimplePlacementRules(),
-          new HashSet<String>(queueNamesInAllocFile), scheduler.getConf());
-    }
-
-    // Commit the reload; also create any queue defined in the alloc file
-    // if it does not already exist, so it can be displayed on the web UI.
-    synchronized (this) {
-      info = new QueueManagerInfo(minQueueResources, maxQueueResources,
-          queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
-          queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
-          queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
-      placementPolicy = newPlacementPolicy;
-      
-      // Make sure all queues exist
-      for (String name: queueNamesInAllocFile) {
-        getLeafQueue(name, true);
-      }
-      
-      for (FSQueue queue : queues.values()) {
-        // Update queue metrics
-        FSQueueMetrics queueMetrics = queue.getMetrics();
-        queueMetrics.setMinShare(queue.getMinShare());
-        queueMetrics.setMaxShare(queue.getMaxShare());
-        // Set scheduling policies
-        if (queuePolicies.containsKey(queue.getName())) {
-          queue.setPolicy(queuePolicies.get(queue.getName()));
-        } else {
-          queue.setPolicy(SchedulingPolicy.getDefault());
-        }
-      }
- 
-    }
-  }
-  
-  /**
-   * Loads a queue from a queue element in the configuration file
-   */
-  private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
-      Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
-      Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
-      Map<String, SchedulingPolicy> queuePolicies,
-      Map<String, Long> minSharePreemptionTimeouts,
-      Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile) 
-      throws AllocationConfigurationException {
-    String queueName = element.getAttribute("name");
-    if (parentName != null) {
-      queueName = parentName + "." + queueName;
-    }
-    Map<QueueACL, AccessControlList> acls =
-        new HashMap<QueueACL, AccessControlList>();
-    NodeList fields = element.getChildNodes();
-    boolean isLeaf = true;
-
-    for (int j = 0; j < fields.getLength(); j++) {
-      Node fieldNode = fields.item(j);
-      if (!(fieldNode instanceof Element))
-        continue;
-      Element field = (Element) fieldNode;
-      if ("minResources".equals(field.getTagName())) {
-        String text = ((Text)field.getFirstChild()).getData().trim();
-        Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
-        minQueueResources.put(queueName, val);
-      } else if ("maxResources".equals(field.getTagName())) {
-        String text = ((Text)field.getFirstChild()).getData().trim();
-        Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
-        maxQueueResources.put(queueName, val);
-      } else if ("maxRunningApps".equals(field.getTagName())) {
-        String text = ((Text)field.getFirstChild()).getData().trim();
-        int val = Integer.parseInt(text);
-        queueMaxApps.put(queueName, val);
-      } else if ("weight".equals(field.getTagName())) {
-        String text = ((Text)field.getFirstChild()).getData().trim();
-        double val = Double.parseDouble(text);
-        queueWeights.put(queueName, new ResourceWeights((float)val));
-      } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
-        String text = ((Text)field.getFirstChild()).getData().trim();
-        long val = Long.parseLong(text) * 1000L;
-        minSharePreemptionTimeouts.put(queueName, val);
-      } else if ("schedulingPolicy".equals(field.getTagName())
-          || "schedulingMode".equals(field.getTagName())) {
-        String text = ((Text)field.getFirstChild()).getData().trim();
-        SchedulingPolicy policy = SchedulingPolicy.parse(text);
-        policy.initialize(scheduler.getClusterCapacity());
-        queuePolicies.put(queueName, policy);
-      } else if ("aclSubmitApps".equals(field.getTagName())) {
-        String text = ((Text)field.getFirstChild()).getData();
-        acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
-      } else if ("aclAdministerApps".equals(field.getTagName())) {
-        String text = ((Text)field.getFirstChild()).getData();
-        acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
-      } else if ("queue".endsWith(field.getTagName()) || 
-          "pool".equals(field.getTagName())) {
-        loadQueue(queueName, field, minQueueResources, maxQueueResources,
-            queueMaxApps, userMaxApps, queueWeights, queuePolicies,
-            minSharePreemptionTimeouts,
-            queueAcls, queueNamesInAllocFile);
-        isLeaf = false;
-      }
-    }
-    if (isLeaf) {
-      queueNamesInAllocFile.add(queueName);
-    }
-    queueAcls.put(queueName, acls);
-    if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
-        && !Resources.fitsIn(minQueueResources.get(queueName),
-            maxQueueResources.get(queueName))) {
-      LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
-          queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
-    }
-  }
-
-  /**
-   * Get the minimum resource allocation for the given queue.
-   * @return the cap set on this queue, or 0 if not set.
-   */
-  public Resource getMinResources(String queue) {
-    Resource minQueueResource = info.minQueueResources.get(queue);
-    if (minQueueResource != null) {
-      return minQueueResource;
-    } else {
-      return Resources.createResource(0);
-    }
-  }
-
-  /**
-   * Get the maximum resource allocation for the given queue.
-   * @return the cap set on this queue, or Integer.MAX_VALUE if not set.
-   */
-
-  public Resource getMaxResources(String queueName) {
-    Resource maxQueueResource = info.maxQueueResources.get(queueName);
-    if (maxQueueResource != null) {
-      return maxQueueResource;
-    } else {
-      return Resources.createResource(Integer.MAX_VALUE, Integer.MAX_VALUE);
-    }
-  }
-
   /**
    * Get a collection of all leaf queues
    */
@@ -567,141 +213,27 @@ public class QueueManager {
   public Collection<FSQueue> getQueues() {
     return queues.values();
   }
-
-  public int getUserMaxApps(String user) {
-    // save current info in case it gets changed under us
-    QueueManagerInfo info = this.info;
-    if (info.userMaxApps.containsKey(user)) {
-      return info.userMaxApps.get(user);
-    } else {
-      return info.userMaxAppsDefault;
-    }
-  }
-
-  public int getQueueMaxApps(String queue) {
-    // save current info in case it gets changed under us
-    QueueManagerInfo info = this.info;
-    if (info.queueMaxApps.containsKey(queue)) {
-      return info.queueMaxApps.get(queue);
-    } else {
-      return info.queueMaxAppsDefault;
-    }
-  }
   
-  public ResourceWeights getQueueWeight(String queue) {
-    ResourceWeights weight = info.queueWeights.get(queue);
-    if (weight != null) {
-      return weight;
-    } else {
-      return ResourceWeights.NEUTRAL;
-    }
-  }
-
-  /**
-   * Get a queue's min share preemption timeout, in milliseconds. This is the
-   * time after which jobs in the queue may kill other queues' tasks if they
-   * are below their min share.
-   */
-  public long getMinSharePreemptionTimeout(String queueName) {
-    // save current info in case it gets changed under us
-    QueueManagerInfo info = this.info;
-    if (info.minSharePreemptionTimeouts.containsKey(queueName)) {
-      return info.minSharePreemptionTimeouts.get(queueName);
-    }
-    return info.defaultMinSharePreemptionTimeout;
-  }
-  
-  /**
-   * Get the fair share preemption, in milliseconds. This is the time
-   * after which any job may kill other jobs' tasks if it is below half
-   * its fair share.
-   */
-  public long getFairSharePreemptionTimeout() {
-    return info.fairSharePreemptionTimeout;
-  }
-
-  /**
-   * Get the ACLs associated with this queue. If a given ACL is not explicitly
-   * configured, include the default value for that ACL.  The default for the
-   * root queue is everybody ("*") and the default for all other queues is
-   * nobody ("")
-   */
-  public AccessControlList getQueueAcl(String queue, QueueACL operation) {
-    Map<QueueACL, AccessControlList> queueAcls = info.queueAcls.get(queue);
-    if (queueAcls == null || !queueAcls.containsKey(operation)) {
-      return (queue.equals(ROOT_QUEUE)) ? EVERYBODY_ACL : NOBODY_ACL;
-    }
-    return queueAcls.get(operation);
-  }
-  
-  static class QueueManagerInfo {
-    // Minimum resource allocation for each queue
-    public final Map<String, Resource> minQueueResources;
-    // Maximum amount of resources per queue
-    public final Map<String, Resource> maxQueueResources;
-    // Sharing weights for each queue
-    public final Map<String, ResourceWeights> queueWeights;
-    
-    // Max concurrent running applications for each queue and for each user; in addition,
-    // for users that have no max specified, we use the userMaxJobsDefault.
-    public final Map<String, Integer> queueMaxApps;
-    public final Map<String, Integer> userMaxApps;
-    public final int userMaxAppsDefault;
-    public final int queueMaxAppsDefault;
-
-    // ACL's for each queue. Only specifies non-default ACL's from configuration.
-    public final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
-
-    // Min share preemption timeout for each queue in seconds. If a job in the queue
-    // waits this long without receiving its guaranteed share, it is allowed to
-    // preempt other jobs' tasks.
-    public final Map<String, Long> minSharePreemptionTimeouts;
-
-    // Default min share preemption timeout for queues where it is not set
-    // explicitly.
-    public final long defaultMinSharePreemptionTimeout;
-
-    // Preemption timeout for jobs below fair share in seconds. If a job remains
-    // below half its fair share for this long, it is allowed to preempt tasks.
-    public final long fairSharePreemptionTimeout;
-
-    public final SchedulingPolicy defaultSchedulingPolicy;
-    
-    public QueueManagerInfo(Map<String, Resource> minQueueResources, 
-        Map<String, Resource> maxQueueResources, 
-        Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
-        Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
-        int queueMaxAppsDefault, SchedulingPolicy defaultSchedulingPolicy, 
-        Map<String, Long> minSharePreemptionTimeouts, 
-        Map<String, Map<QueueACL, AccessControlList>> queueAcls,
-        long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout) {
-      this.minQueueResources = minQueueResources;
-      this.maxQueueResources = maxQueueResources;
-      this.queueMaxApps = queueMaxApps;
-      this.userMaxApps = userMaxApps;
-      this.queueWeights = queueWeights;
-      this.userMaxAppsDefault = userMaxAppsDefault;
-      this.queueMaxAppsDefault = queueMaxAppsDefault;
-      this.defaultSchedulingPolicy = defaultSchedulingPolicy;
-      this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
-      this.queueAcls = queueAcls;
-      this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
-      this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
+  public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
+    // Make sure all queues exist
+    for (String name : queueConf.getQueueNames()) {
+      getLeafQueue(name, true);
     }
     
-    public QueueManagerInfo() {
-      minQueueResources = new HashMap<String, Resource>();
-      maxQueueResources = new HashMap<String, Resource>();
-      queueWeights = new HashMap<String, ResourceWeights>();
-      queueMaxApps = new HashMap<String, Integer>();
-      userMaxApps = new HashMap<String, Integer>();
-      userMaxAppsDefault = Integer.MAX_VALUE;
-      queueMaxAppsDefault = Integer.MAX_VALUE;
-      queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
-      minSharePreemptionTimeouts = new HashMap<String, Long>();
-      defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
-      fairSharePreemptionTimeout = Long.MAX_VALUE;
-      defaultSchedulingPolicy = SchedulingPolicy.getDefault();
+    for (FSQueue queue : queues.values()) {
+      // Update queue metrics
+      FSQueueMetrics queueMetrics = queue.getMetrics();
+      queueMetrics.setMinShare(queue.getMinShare());
+      queueMetrics.setMaxShare(queue.getMaxShare());
+      // Set scheduling policies
+      try {
+        SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName());
+        policy.initialize(scheduler.getClusterCapacity());
+        queue.setPolicy(policy);
+      } catch (AllocationConfigurationException ex) {
+        LOG.warn("Cannot apply configured scheduling policy to queue "
+            + queue.getName(), ex);
+      }
     }
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java?rev=1548007&r1=1548006&r2=1548007&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java Thu Dec  5 03:30:17 2013
@@ -95,6 +95,34 @@ public class QueuePlacementPolicy {
   }
   
   /**
+   * Build a simple queue placement policy from the allow-undeclared-pools and
+   * user-as-default-queue configuration options.
+   */
+  public static QueuePlacementPolicy fromConfiguration(Configuration conf,
+      Set<String> configuredQueues) {
+    boolean create = conf.getBoolean(
+        FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
+        FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS);
+    boolean userAsDefaultQueue = conf.getBoolean(
+        FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE,
+        FairSchedulerConfiguration.DEFAULT_USER_AS_DEFAULT_QUEUE);
+    List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
+    rules.add(new QueuePlacementRule.Specified().initialize(create, null));
+    if (userAsDefaultQueue) {
+      rules.add(new QueuePlacementRule.User().initialize(create, null));
+    }
+    if (!userAsDefaultQueue || !create) {
+      rules.add(new QueuePlacementRule.Default().initialize(true, null));
+    }
+    try {
+      return new QueuePlacementPolicy(rules, configuredQueues, conf);
+    } catch (AllocationConfigurationException ex) {
+      throw new RuntimeException("Should never hit exception when loading" +
+      		"placement policy from conf", ex);
+    }
+  }
+
+  /**
    * Applies this rule to an app with the given requested queue and user/group
    * information.
    * 
@@ -120,4 +148,8 @@ public class QueuePlacementPolicy {
     throw new IllegalStateException("Should have applied a rule before " +
     		"reaching here");
   }
+  
+  public List<QueuePlacementRule> getRules() {
+    return rules;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java?rev=1548007&r1=1548006&r2=1548007&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java Thu Dec  5 03:30:17 2013
@@ -35,7 +35,7 @@ public abstract class SchedulingPolicy {
   private static final ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy> instances =
       new ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy>();
 
-  private static SchedulingPolicy DEFAULT_POLICY =
+  public static final SchedulingPolicy DEFAULT_POLICY =
       getInstance(FairSharePolicy.class);
   
   public static final byte DEPTH_LEAF = (byte) 1;
@@ -44,15 +44,6 @@ public abstract class SchedulingPolicy {
   public static final byte DEPTH_PARENT = (byte) 6; // Root and Intermediate
   public static final byte DEPTH_ANY = (byte) 7;
 
-  public static SchedulingPolicy getDefault() {
-    return DEFAULT_POLICY;
-  }
-
-  public static void setDefault(String className)
-      throws AllocationConfigurationException {
-    DEFAULT_POLICY = parse(className);
-  }
-
   /**
    * Returns a {@link SchedulingPolicy} instance corresponding to the passed clazz
    */

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java?rev=1548007&r1=1548006&r2=1548007&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java Thu Dec  5 03:30:17 2013
@@ -29,10 +29,10 @@ import javax.xml.bind.annotation.XmlSeeA
 import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueueManager;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 @XmlRootElement
@@ -65,7 +65,7 @@ public class FairSchedulerQueueInfo {  
   }
   
   public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) {
-    QueueManager manager = scheduler.getQueueManager();
+    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
     
     queueName = queue.getName();
     schedulingPolicy = queue.getPolicy().getName();
@@ -87,7 +87,7 @@ public class FairSchedulerQueueInfo {  
     fractionMemMinShare = (float)minResources.getMemory() / clusterResources.getMemory();
     fractionMemMaxShare = (float)maxResources.getMemory() / clusterResources.getMemory();
     
-    maxApps = manager.getQueueMaxApps(queueName);
+    maxApps = allocConf.getQueueMaxApps(queueName);
     
     Collection<FSQueue> children = queue.getChildQueues();
     childQueues = new ArrayList<FairSchedulerQueueInfo>();

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java?rev=1548007&r1=1548006&r2=1548007&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java Thu Dec  5 03:30:17 2013
@@ -51,11 +51,11 @@ public class TestFSLeafQueue {
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     
     String queueName = "root.queue1";
-    QueueManager mockMgr = mock(QueueManager.class);
-    when(mockMgr.getMaxResources(queueName)).thenReturn(maxResource);
-    when(mockMgr.getMinResources(queueName)).thenReturn(Resources.none());
+    scheduler.allocConf = mock(AllocationConfiguration.class);
+    when(scheduler.allocConf.getMaxResources(queueName)).thenReturn(maxResource);
+    when(scheduler.allocConf.getMinResources(queueName)).thenReturn(Resources.none());
 
-    schedulable = new FSLeafQueue(queueName, mockMgr, scheduler, null);
+    schedulable = new FSLeafQueue(queueName, scheduler, null);
   }
 
   @Test