You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yu...@apache.org on 2019/03/26 05:49:32 UTC

[hadoop] branch trunk updated: YARN-8967. Change FairScheduler to use PlacementRule interface. Contributed by Wilfred Spiegelenburg.

This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5257f50  YARN-8967. Change FairScheduler to use PlacementRule interface. Contributed by Wilfred Spiegelenburg.
5257f50 is described below

commit 5257f50abb71905ef3068fd45541d00ce9e8f355
Author: yufei <yu...@apache.org>
AuthorDate: Mon Mar 25 22:47:24 2019 -0700

    YARN-8967. Change FairScheduler to use PlacementRule interface. Contributed by Wilfred Spiegelenburg.
---
 .../yarn/server/resourcemanager/RMAppManager.java  | 117 +++--
 .../resourcemanager/placement/FSPlacementRule.java |  19 +-
 .../placement/QueuePlacementRuleUtils.java         |   2 +-
 .../scheduler/fair/AllocationConfiguration.java    |  30 +-
 .../fair/AllocationFileLoaderService.java          |  28 +-
 .../scheduler/fair/FairScheduler.java              | 144 ++----
 .../scheduler/fair/QueueManager.java               |  15 +-
 .../scheduler/fair/QueuePlacementPolicy.java       | 344 ++++++++++----
 .../scheduler/fair/QueuePlacementRule.java         | 366 ---------------
 .../scheduler/fair/allocation/QueueProperties.java |   3 +-
 .../TestAppManagerWithFairScheduler.java           | 289 ++++++++----
 .../resourcemanager/TestApplicationACLs.java       |   2 +-
 .../reservation/TestSchedulerPlanFollowerBase.java |  12 +-
 .../scheduler/TestSchedulerUtils.java              |  10 +-
 .../scheduler/fair/FairSchedulerTestBase.java      |  21 +-
 .../fair/TestAllocationFileLoaderService.java      | 152 +++---
 .../scheduler/fair/TestAppRunnability.java         |  19 +-
 .../scheduler/fair/TestContinuousScheduling.java   |  30 +-
 .../scheduler/fair/TestFSAppAttempt.java           |   5 +-
 .../scheduler/fair/TestFSParentQueue.java          |  20 +-
 .../scheduler/fair/TestFairScheduler.java          | 295 ++++--------
 .../TestFairSchedulerWithMultiResourceTypes.java   |   9 +
 .../scheduler/fair/TestMaxRunningAppsEnforcer.java |  25 +-
 .../scheduler/fair/TestQueueManager.java           |  45 +-
 .../scheduler/fair/TestQueuePlacementPolicy.java   | 515 +++++++++++++++------
 .../scheduler/fair/TestSchedulingPolicy.java       |   9 +
 .../webapp/dao/TestFairSchedulerQueueInfo.java     |  24 +-
 27 files changed, 1336 insertions(+), 1214 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index b92c7f6..6e6c8d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -68,6 +68,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Times;
@@ -418,7 +420,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
 
     // We only replace the queue when it's a new application
     if (!isRecovery) {
-      replaceQueueFromPlacementContext(placementContext, submissionContext);
+      copyPlacementQueueToSubmissionContext(placementContext,
+          submissionContext);
 
       // fail the submission if configured application timeout value is invalid
       RMServerUtils.validateApplicationTimeouts(
@@ -443,38 +446,60 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
       submissionContext.setPriority(appPriority);
     }
 
-    // Since FairScheduler queue mapping is done inside scheduler,
-    // if FairScheduler is used and the queue doesn't exist, we should not
-    // fail here because queue will be created inside FS. Ideally, FS queue
-    // mapping should be done outside scheduler too like CS.
-    // For now, exclude FS for the acl check.
-    if (!isRecovery && YarnConfiguration.isAclEnabled(conf)
-        && scheduler instanceof CapacityScheduler) {
-      String queueName = submissionContext.getQueue();
-      String appName = submissionContext.getApplicationName();
-      CSQueue csqueue = ((CapacityScheduler) scheduler).getQueue(queueName);
-
-      if (csqueue == null && placementContext != null) {
-        //could be an auto created queue through queue mapping. Validate
-        // parent queue exists and has valid acls
-        String parentQueueName = placementContext.getParentQueue();
-        csqueue = ((CapacityScheduler) scheduler).getQueue(parentQueueName);
-      }
+    if (!isRecovery && YarnConfiguration.isAclEnabled(conf)) {
+      if (scheduler instanceof CapacityScheduler) {
+        String queueName = submissionContext.getQueue();
+        String appName = submissionContext.getApplicationName();
+        CSQueue csqueue = ((CapacityScheduler) scheduler).getQueue(queueName);
+
+        if (csqueue == null && placementContext != null) {
+          //could be an auto created queue through queue mapping. Validate
+          // parent queue exists and has valid acls
+          String parentQueueName = placementContext.getParentQueue();
+          csqueue = ((CapacityScheduler) scheduler).getQueue(parentQueueName);
+        }
 
-      if (csqueue != null
-          && !authorizer.checkPermission(
-              new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
-                  SchedulerUtils.toAccessType(QueueACL.SUBMIT_APPLICATIONS),
-                  applicationId.toString(), appName, Server.getRemoteAddress(),
-                  null))
-          && !authorizer.checkPermission(
-              new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
-                  SchedulerUtils.toAccessType(QueueACL.ADMINISTER_QUEUE),
-                  applicationId.toString(), appName, Server.getRemoteAddress(),
-                  null))) {
-        throw RPCUtil.getRemoteException(new AccessControlException(
-            "User " + user + " does not have permission to submit "
-                + applicationId + " to queue " + submissionContext.getQueue()));
+        if (csqueue != null
+            && !authorizer.checkPermission(
+            new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
+                SchedulerUtils.toAccessType(QueueACL.SUBMIT_APPLICATIONS),
+                applicationId.toString(), appName, Server.getRemoteAddress(),
+                null))
+            && !authorizer.checkPermission(
+            new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
+                SchedulerUtils.toAccessType(QueueACL.ADMINISTER_QUEUE),
+                applicationId.toString(), appName, Server.getRemoteAddress(),
+                null))) {
+          throw RPCUtil.getRemoteException(new AccessControlException(
+              "User " + user + " does not have permission to submit "
+                  + applicationId + " to queue "
+                  + submissionContext.getQueue()));
+        }
+      }
+      if (scheduler instanceof FairScheduler) {
+        // if we have not placed the app just skip this, the submit will be
+        // rejected in the scheduler.
+        if (placementContext != null) {
+          // The queue might not be created yet. Walk up the tree to check the
+          // parent ACL. The queueName is assured root which always exists
+          String queueName = submissionContext.getQueue();
+          FSQueue queue = ((FairScheduler) scheduler).getQueueManager().
+              getQueue(queueName);
+          while (queue == null) {
+            int sepIndex = queueName.lastIndexOf(".");
+            queueName = queueName.substring(0, sepIndex);
+            queue = ((FairScheduler) scheduler).getQueueManager().
+                getQueue(queueName);
+          }
+          if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) &&
+              !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
+            throw RPCUtil.getRemoteException(new AccessControlException(
+                "User " + user + " does not have permission to submit " +
+                    applicationId + " to queue " +
+                    submissionContext.getQueue() +
+                    " denied by ACL for queue " + queueName));
+          }
+        }
       }
     }
 
@@ -841,34 +866,38 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
         // Placement could also fail if the user doesn't exist in system
         // skip if the user is not found during recovery.
         if (isRecovery) {
-          LOG.warn("PlaceApplication failed,skipping on recovery of rm");
+          LOG.warn("Application placement failed for user " + user +
+              " and application " + context.getApplicationId() +
+              ", skipping placement on recovery of rm", e);
           return placementContext;
         }
         throw e;
       }
     }
-    if (placementContext == null && (context.getQueue() == null) || context
-        .getQueue().isEmpty()) {
+    // The submission context when created often has a queue set. In case of
+    // the FairScheduler a null placement context is still considered as a
+    // failure, even when a queue is provided on submit. This case handled in
+    // the scheduler.
+    if (placementContext == null && (context.getQueue() == null) ||
+        context.getQueue().isEmpty()) {
       String msg = "Failed to place application " + context.getApplicationId()
-          + " to queue and specified " + "queue is invalid : " + context
-          .getQueue();
+          + " in a queue and submit context queue is null or empty";
       LOG.error(msg);
       throw new YarnException(msg);
     }
     return placementContext;
   }
 
-  void replaceQueueFromPlacementContext(
+  private void copyPlacementQueueToSubmissionContext(
       ApplicationPlacementContext placementContext,
       ApplicationSubmissionContext context) {
-    // Set it to ApplicationSubmissionContext
-    //apply queue mapping only to new application submissions
+    // Set the queue from the placement in the ApplicationSubmissionContext
+    // Placement rule are only considered for new applications
     if (placementContext != null && !StringUtils.equalsIgnoreCase(
         context.getQueue(), placementContext.getQueue())) {
-      LOG.info("Placed application=" + context.getApplicationId() +
-          " to queue=" + placementContext.getQueue() + ", original queue="
-          + context
-          .getQueue());
+      LOG.info("Placed application with ID " + context.getApplicationId() +
+          " in queue: " + placementContext.getQueue() +
+          ", original submission queue was: " + context.getQueue());
       context.setQueue(placementContext.getQueue());
     }
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/FSPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/FSPlacementRule.java
index 7e9e6ef..7ceb374 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/FSPlacementRule.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/FSPlacementRule.java
@@ -58,10 +58,12 @@ public abstract class FSPlacementRule extends PlacementRule {
   }
 
   /**
-   * Set a rule to generate the parent queue dynamically.
+   * Set a rule to generate the parent queue dynamically. The parent rule
+   * should only be called on rule creation when the policy is read from the
+   * configuration.
    * @param parent A PlacementRule
    */
-  void setParentRule(PlacementRule parent) {
+  public void setParentRule(PlacementRule parent) {
     this.parentRule = parent;
   }
 
@@ -69,7 +71,8 @@ public abstract class FSPlacementRule extends PlacementRule {
    * Get the rule that is set to generate the parent queue dynamically.
    * @return The rule set or <code>null</code> if not set.
    */
-  PlacementRule getParentRule() {
+  @VisibleForTesting
+  public PlacementRule getParentRule() {
     return parentRule;
   }
 
@@ -150,6 +153,14 @@ public abstract class FSPlacementRule extends PlacementRule {
   }
 
   /**
+   * Get the create flag as set during the config setup.
+   * @return The value of the {@link #createQueue} flag
+   */
+  public boolean getCreateFlag() {
+    return createQueue;
+  }
+
+  /**
    * Get the create flag from the xml configuration element.
    * @param conf The FS configuration element for the queue
    * @return <code>false</code> only if the flag is set in the configuration to
@@ -159,7 +170,7 @@ public abstract class FSPlacementRule extends PlacementRule {
   boolean getCreateFlag(Element conf) {
     if (conf != null) {
       String create = conf.getAttribute("create");
-      return Boolean.parseBoolean(create);
+      return create.isEmpty() || Boolean.parseBoolean(create);
     }
     return true;
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java
index 0b5fe2e..adee5d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java
@@ -28,7 +28,7 @@ import java.io.IOException;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
 
 /**
- * Utility class for QueuePlacementRule.
+ * Utility class for Capacity Scheduler queue PlacementRules.
  */
 public final class QueuePlacementRuleUtils {
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
index 826d9f5..7f06521 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
@@ -23,7 +23,6 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ReservationACL;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -94,10 +93,6 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
   //Map for maximum container resource allocation per queues by queue name
   private final Map<String, Resource> queueMaxContainerAllocationMap;
 
-  // Policy for mapping apps to queues
-  @VisibleForTesting
-  QueuePlacementPolicy placementPolicy;
-
   //Configured queues in the alloc xml
   @VisibleForTesting
   Map<FSQueueType, Set<String>> configuredQueues;
@@ -107,9 +102,16 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
 
   private final Set<String> nonPreemptableQueues;
 
+  /**
+   * Create a fully initialised configuration for the scheduler.
+   * @param queueProperties The list of queues and their properties from the
+   *                        configuration.
+   * @param allocationFileParser The allocation file parser
+   * @param globalReservationQueueConfig The reservation queue config
+   * @throws AllocationConfigurationException
+   */
   public AllocationConfiguration(QueueProperties queueProperties,
       AllocationFileParser allocationFileParser,
-      QueuePlacementPolicy newPlacementPolicy,
       ReservationQueueConfiguration globalReservationQueueConfig)
       throws AllocationConfigurationException {
     this.minQueueResources = queueProperties.getMinQueueResources();
@@ -138,14 +140,19 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
     this.resAcls = queueProperties.getReservationAcls();
     this.reservableQueues = queueProperties.getReservableQueues();
     this.globalReservationQueueConfig = globalReservationQueueConfig;
-    this.placementPolicy = newPlacementPolicy;
     this.configuredQueues = queueProperties.getConfiguredQueues();
     this.nonPreemptableQueues = queueProperties.getNonPreemptableQueues();
     this.queueMaxContainerAllocationMap =
         queueProperties.getMaxContainerAllocation();
   }
 
-  public AllocationConfiguration(Configuration conf) {
+  /**
+   * Create a base scheduler configuration with just the defaults set.
+   * Should only be called to init a basic setup on scheduler init.
+   * @param scheduler The {@link FairScheduler} to create and initialise the
+   *                  placement policy.
+   */
+  public AllocationConfiguration(FairScheduler scheduler) {
     minQueueResources = new HashMap<>();
     maxChildQueueResources = new HashMap<>();
     maxQueueResources = new HashMap<>();
@@ -169,8 +176,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
     for (FSQueueType queueType : FSQueueType.values()) {
       configuredQueues.put(queueType, new HashSet<>());
     }
-    placementPolicy =
-        QueuePlacementPolicy.fromConfiguration(conf, configuredQueues);
+    QueuePlacementPolicy.fromConfiguration(scheduler);
     nonPreemptableQueues = new HashSet<>();
     queueMaxContainerAllocationMap = new HashMap<>();
   }
@@ -309,10 +315,6 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
     return configuredQueues;
   }
 
-  public QueuePlacementPolicy getPlacementPolicy() {
-    return placementPolicy;
-  }
-
   @Override
   public boolean isReservable(String queue) {
     return reservableQueues.contains(queue);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
index b92bd5d..829188d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
@@ -78,6 +78,7 @@ public class AllocationFileLoaderService extends AbstractService {
       "(?i)(hdfs)|(file)|(s3a)|(viewfs)";
 
   private final Clock clock;
+  private final FairScheduler scheduler;
 
   // Last time we successfully reloaded queues
   private volatile long lastSuccessfulReload;
@@ -95,14 +96,15 @@ public class AllocationFileLoaderService extends AbstractService {
   private Thread reloadThread;
   private volatile boolean running = true;
 
-  public AllocationFileLoaderService() {
-    this(SystemClock.getInstance());
+  AllocationFileLoaderService(FairScheduler scheduler) {
+    this(SystemClock.getInstance(), scheduler);
   }
 
   private List<Permission> defaultPermissions;
 
-  public AllocationFileLoaderService(Clock clock) {
+  AllocationFileLoaderService(Clock clock, FairScheduler scheduler) {
     super(AllocationFileLoaderService.class.getName());
+    this.scheduler = scheduler;
     this.clock = clock;
   }
 
@@ -254,17 +256,15 @@ public class AllocationFileLoaderService extends AbstractService {
         new AllocationFileQueueParser(allocationFileParser.getQueueElements());
     QueueProperties queueProperties = queueParser.parse();
 
-    // Load placement policy and pass it configured queues
-    Configuration conf = getConfig();
-    QueuePlacementPolicy newPlacementPolicy =
-        getQueuePlacementPolicy(allocationFileParser, queueProperties, conf);
+    // Load placement policy
+    getQueuePlacementPolicy(allocationFileParser);
     setupRootQueueProperties(allocationFileParser, queueProperties);
 
     ReservationQueueConfiguration globalReservationQueueConfig =
         createReservationQueueConfig(allocationFileParser);
 
     AllocationConfiguration info = new AllocationConfiguration(queueProperties,
-        allocationFileParser, newPlacementPolicy, globalReservationQueueConfig);
+        allocationFileParser, globalReservationQueueConfig);
 
     lastSuccessfulReload = clock.getTime();
     lastReloadAttemptFailed = false;
@@ -272,17 +272,15 @@ public class AllocationFileLoaderService extends AbstractService {
     reloadListener.onReload(info);
   }
 
-  private QueuePlacementPolicy getQueuePlacementPolicy(
-      AllocationFileParser allocationFileParser,
-      QueueProperties queueProperties, Configuration conf)
+  private void getQueuePlacementPolicy(
+      AllocationFileParser allocationFileParser)
       throws AllocationConfigurationException {
     if (allocationFileParser.getQueuePlacementPolicy().isPresent()) {
-      return QueuePlacementPolicy.fromXml(
+      QueuePlacementPolicy.fromXml(
           allocationFileParser.getQueuePlacementPolicy().get(),
-          queueProperties.getConfiguredQueues(), conf);
+          scheduler);
     } else {
-      return QueuePlacementPolicy.fromConfiguration(conf,
-          queueProperties.getConfiguredQueues());
+      QueuePlacementPolicy.fromConfiguration(scheduler);
     }
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 8324f8d..166bb48 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -214,7 +215,7 @@ public class FairScheduler extends
   public FairScheduler() {
     super(FairScheduler.class.getName());
     context = new FSContext(this);
-    allocsLoader = new AllocationFileLoaderService();
+    allocsLoader = new AllocationFileLoaderService(this);
     queueMgr = new QueueManager(this);
     maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
   }
@@ -223,6 +224,10 @@ public class FairScheduler extends
     return context;
   }
 
+  public RMContext getRMContext() {
+    return rmContext;
+  }
+
   public boolean isAtLeastReservationThreshold(
       ResourceCalculator resourceCalculator, Resource resource) {
     return Resources.greaterThanOrEqual(resourceCalculator,
@@ -455,32 +460,52 @@ public class FairScheduler extends
    * configured limits, but the app will not be marked as runnable.
    */
   protected void addApplication(ApplicationId applicationId,
-      String queueName, String user, boolean isAppRecovering) {
-    if (queueName == null || queueName.isEmpty()) {
-      String message =
-          "Reject application " + applicationId + " submitted by user " + user
-              + " with an empty queue name.";
-      rejectApplicationWithMessage(applicationId, message);
-      return;
-    }
-
-    if (queueName.startsWith(".") || queueName.endsWith(".")) {
-      String message =
-          "Reject application " + applicationId + " submitted by user " + user
-              + " with an illegal queue name " + queueName + ". "
-              + "The queue name cannot start/end with period.";
+      String queueName, String user, boolean isAppRecovering,
+      ApplicationPlacementContext placementContext) {
+    // If the  placement was rejected the placementContext will be null.
+    // We ignore placement rules on recovery.
+    if (!isAppRecovering && placementContext == null) {
+      String message = "Reject application " + applicationId +
+          " submitted by user " + user +
+          " application rejected by placement rules.";
       rejectApplicationWithMessage(applicationId, message);
       return;
     }
 
     writeLock.lock();
     try {
-      RMApp rmApp = rmContext.getRMApps().get(applicationId);
-      FSLeafQueue queue = assignToQueue(rmApp, queueName, user, applicationId);
+      // Assign the app to the queue creating and prevent queue delete.
+      FSLeafQueue queue = queueMgr.getLeafQueue(queueName, true,
+          applicationId);
       if (queue == null) {
+        rejectApplicationWithMessage(applicationId,
+            queueName + " is not a leaf queue");
+        return;
+      }
+
+      // Enforce ACLs: 2nd check, there could be a time laps between the app
+      // creation in the RMAppManager and getting here. That means we could
+      // have a configuration change (prevent race condition)
+      UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
+          user);
+
+      if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) &&
+          !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
+        String msg = "User " + user + " does not have permission to submit " +
+            applicationId + " to queue " + queueName;
+        rejectApplicationWithMessage(applicationId, msg);
+        queue.removeAssignedApp(applicationId);
         return;
       }
 
+      RMApp rmApp = rmContext.getRMApps().get(applicationId);
+      if (rmApp != null) {
+        rmApp.setQueue(queueName);
+      } else {
+        LOG.error("Couldn't find RM app for " + applicationId +
+            " to set queue name on");
+      }
+
       if (rmApp != null && rmApp.getAMResourceRequests() != null) {
         // Resources.fitsIn would always return false when queueMaxShare is 0
         // for any resource, but only using Resources.fitsIn is not enough
@@ -499,7 +524,7 @@ public class FairScheduler extends
                           + "it has zero amount of resource for a requested "
                           + "resource! Invalid requested AM resources: %s, "
                           + "maximum queue resources: %s",
-                  applicationId, queue.getName(),
+                  applicationId, queueName,
                   invalidAMResourceRequests, queue.getMaxShare());
           rejectApplicationWithMessage(applicationId, msg);
           queue.removeAssignedApp(applicationId);
@@ -507,27 +532,13 @@ public class FairScheduler extends
         }
       }
 
-      // Enforce ACLs
-      UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
-          user);
-
-      if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) && !queue
-          .hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
-        String msg = "User " + userUgi.getUserName()
-            + " cannot submit applications to queue " + queue.getName()
-            + "(requested queuename is " + queueName + ")";
-        rejectApplicationWithMessage(applicationId, msg);
-        queue.removeAssignedApp(applicationId);
-        return;
-      }
-
       SchedulerApplication<FSAppAttempt> application =
-          new SchedulerApplication<FSAppAttempt>(queue, user);
+          new SchedulerApplication<>(queue, user);
       applications.put(applicationId, application);
       queue.getMetrics().submitApp(user);
 
       LOG.info("Accepted application " + applicationId + " from user: " + user
-          + ", in queue: " + queue.getName()
+          + ", in queue: " + queueName
           + ", currently num of applications: " + applications.size());
       if (isAppRecovering) {
         LOG.debug("{} is recovering. Skip notifying APP_ACCEPTED",
@@ -596,60 +607,6 @@ public class FairScheduler extends
     }
   }
 
-  /**
-   * Helper method for the tests to assign the app to a queue.
-   */
-  @VisibleForTesting
-  FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
-    return assignToQueue(rmApp, queueName, user, null);
-  }
-
-  /**
-   * Helper method that attempts to assign the app to a queue. The method is
-   * responsible to call the appropriate event-handler if the app is rejected.
-   */
-  private FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user,
-        ApplicationId applicationId) {
-    FSLeafQueue queue = null;
-    String appRejectMsg = null;
-
-    try {
-      QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy();
-      queueName = placementPolicy.assignAppToQueue(queueName, user);
-      if (queueName == null) {
-        appRejectMsg = "Application rejected by queue placement policy";
-      } else {
-        queue = queueMgr.getLeafQueue(queueName, true, applicationId);
-        if (queue == null) {
-          appRejectMsg = queueName + " is not a leaf queue";
-        }
-      }
-    } catch (IllegalStateException se) {
-      appRejectMsg = "Unable to match app " + rmApp.getApplicationId() +
-          " to a queue placement policy, and no valid terminal queue " +
-          " placement rule is configured. Please contact an administrator " +
-          " to confirm that the fair scheduler configuration contains a " +
-          " valid terminal queue placement rule.";
-    } catch (InvalidQueueNameException qne) {
-      appRejectMsg = qne.getMessage();
-    } catch (IOException ioe) {
-      // IOException should only happen for a user without groups
-      appRejectMsg = "Error assigning app to a queue: " + ioe.getMessage();
-    }
-
-    if (appRejectMsg != null && rmApp != null) {
-      rejectApplicationWithMessage(rmApp.getApplicationId(), appRejectMsg);
-      return null;
-    }
-
-    if (rmApp != null) {
-      rmApp.setQueue(queue.getName());
-    } else {
-      LOG.error("Couldn't find RM app to set queue name on");
-    }
-    return queue;
-  }
-
   private void removeApplication(ApplicationId applicationId,
       RMAppState finalState) {
     SchedulerApplication<FSAppAttempt> application = applications.remove(
@@ -1265,7 +1222,8 @@ public class FairScheduler extends
       if (queueName != null) {
         addApplication(appAddedEvent.getApplicationId(),
             queueName, appAddedEvent.getUser(),
-            appAddedEvent.getIsAppRecovering());
+            appAddedEvent.getIsAppRecovering(),
+            appAddedEvent.getPlacementContext());
       }
       break;
     case APP_REMOVED:
@@ -1442,12 +1400,8 @@ public class FairScheduler extends
       // This stores per-application scheduling information
       this.applications = new ConcurrentHashMap<>();
 
-      allocConf = new AllocationConfiguration(conf);
-      try {
-        queueMgr.initialize(conf);
-      } catch (Exception e) {
-        throw new IOException("Failed to start FairScheduler", e);
-      }
+      allocConf = new AllocationConfiguration(this);
+      queueMgr.initialize();
 
       if (continuousSchedulingEnabled) {
         // Continuous scheduling is deprecated log it on startup
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
index 6a1f953..4fe38d5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
@@ -18,34 +18,28 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-import javax.xml.parsers.ParserConfigurationException;
-
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.annotations.VisibleForTesting;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
-import org.xml.sax.SAXException;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Iterator;
-import java.util.Set;
 
 /**
  * Maintains a list of queues as well as scheduling parameters for each queue,
@@ -106,8 +100,7 @@ public class QueueManager {
     return rootQueue;
   }
 
-  public void initialize(Configuration conf) throws IOException,
-      SAXException, AllocationConfigurationException, ParserConfigurationException {
+  public void initialize() {
     // Policies of root and default queue are set to
     // SchedulingPolicy.DEFAULT_POLICY since the allocation file hasn't been
     // loaded yet.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
index 30ea213..904fb2f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
@@ -23,85 +23,261 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Groups;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.DefaultPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.FSPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PrimaryGroupPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.RejectPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.SecondaryGroupExistingPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.SpecifiedPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserPlacementRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory.getPlacementRule;
+
+/**
+ * The FairScheduler rules based policy for placing an application in a queue.
+ * It parses the configuration and updates the {@link
+ * org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager}
+ * with a list of {@link PlacementRule}s to execute in order.
+ */
 @Private
 @Unstable
-public class QueuePlacementPolicy {
-  private static final Map<String, Class<? extends QueuePlacementRule>> ruleClasses;
+final class QueuePlacementPolicy {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(QueuePlacementPolicy.class);
+
+  // Simple private class to make the rule mapping simpler.
+  private static final class RuleMap {
+    private final Class<? extends PlacementRule> ruleClass;
+    private final String terminal;
+
+    private RuleMap(Class<? extends PlacementRule> clazz, String terminate) {
+      this.ruleClass = clazz;
+      this.terminal = terminate;
+    }
+  }
+
+  // The list of known rules:
+  // key to the map is the name in the configuration.
+  // for each name the mapping contains the class name of the implementation
+  // and a flag (true, false or create) which describes the terminal state
+  // see the method getTerminal() for more comments.
+  private static final Map<String, RuleMap> RULES;
   static {
-    Map<String, Class<? extends QueuePlacementRule>> map =
-        new HashMap<String, Class<? extends QueuePlacementRule>>();
-    map.put("user", QueuePlacementRule.User.class);
-    map.put("primaryGroup", QueuePlacementRule.PrimaryGroup.class);
+    Map<String, RuleMap> map = new HashMap<>();
+    map.put("user", new RuleMap(UserPlacementRule.class, "create"));
+    map.put("primaryGroup",
+        new RuleMap(PrimaryGroupPlacementRule.class, "create"));
     map.put("secondaryGroupExistingQueue",
-        QueuePlacementRule.SecondaryGroupExistingQueue.class);
-    map.put("specified", QueuePlacementRule.Specified.class);
-    map.put("nestedUserQueue",
-        QueuePlacementRule.NestedUserQueue.class);
-    map.put("default", QueuePlacementRule.Default.class);
-    map.put("reject", QueuePlacementRule.Reject.class);
-    ruleClasses = Collections.unmodifiableMap(map);
+        new RuleMap(SecondaryGroupExistingPlacementRule.class, "false"));
+    map.put("specified", new RuleMap(SpecifiedPlacementRule.class, "false"));
+    map.put("nestedUserQueue", new RuleMap(UserPlacementRule.class, "create"));
+    map.put("default", new RuleMap(DefaultPlacementRule.class, "create"));
+    map.put("reject", new RuleMap(RejectPlacementRule.class, "true"));
+    RULES = Collections.unmodifiableMap(map);
+  }
+
+  private QueuePlacementPolicy() {
   }
-  
-  private final List<QueuePlacementRule> rules;
-  private final Map<FSQueueType, Set<String>> configuredQueues;
-  private final Groups groups;
-  
-  public QueuePlacementPolicy(List<QueuePlacementRule> rules,
-      Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
+
+  /**
+   * Update the rules in the manager based on this placement policy.
+   * @param newRules The new list of rules to set in the manager.
+   * @param newTerminalState The list of terminal states for this set of rules.
+   * @param fs the reference to the scheduler needed in the rule on init.
+   * @throws AllocationConfigurationException for any errors
+   */
+  private static void updateRuleSet(List<PlacementRule> newRules,
+                                    List<Boolean> newTerminalState,
+                                    FairScheduler fs)
       throws AllocationConfigurationException {
-    for (int i = 0; i < rules.size()-1; i++) {
-      if (rules.get(i).isTerminal()) {
+    if (newRules.isEmpty()) {
+      LOG.debug("Empty rule set defined, ignoring update");
+      return;
+    }
+    LOG.debug("Placement rule order check");
+    for (int i = 0; i < newTerminalState.size()-1; i++) {
+      if (newTerminalState.get(i)) {
         throw new AllocationConfigurationException("Rules after rule "
-            + i + " in queue placement policy can never be reached");
+            + (i+1) + " in queue placement policy can never be reached");
       }
     }
-    if (!rules.get(rules.size()-1).isTerminal()) {
+    if (!newTerminalState.get(newTerminalState.size()-1)) {
       throw new AllocationConfigurationException(
           "Could get past last queue placement rule without assigning");
     }
-    this.rules = rules;
-    this.configuredQueues = configuredQueues;
-    groups = new Groups(conf);
+    // Set the scheduler in the rule to get queues etc
+    LOG.debug("Initialising new rule set");
+    try {
+      for (PlacementRule rule: newRules){
+        rule.initialize(fs);
+      }
+    } catch (IOException ioe) {
+      // We should never throw as we pass in a FS object, however we still
+      // should consider any exception here a config error.
+      throw new AllocationConfigurationException(
+          "Rule initialisation failed with exception", ioe);
+    }
+    // Update the placement manager with the new rule list.
+    // We only get here when all rules are OK.
+    fs.getRMContext().getQueuePlacementManager().updateRules(newRules);
+    LOG.debug("PlacementManager active with new rule set");
   }
-  
+
   /**
-   * Builds a QueuePlacementPolicy from an xml element.
+   * Builds a QueuePlacementPolicy from a xml element.
+   * @param confElement the placement policy xml snippet from the
+   *                    {@link FairSchedulerConfiguration}
+   * @param fs the reference to the scheduler needed in the rule on init.
+   * @throws AllocationConfigurationException for any errors
    */
-  public static QueuePlacementPolicy fromXml(Element el,
-      Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
+  static void fromXml(Element confElement, FairScheduler fs)
       throws AllocationConfigurationException {
-    List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
-    NodeList elements = el.getChildNodes();
+    LOG.debug("Reloading placement policy from allocation config");
+    if (confElement == null || !confElement.hasChildNodes()) {
+      throw new AllocationConfigurationException(
+          "Empty configuration for QueuePlacementPolicy is not allowed");
+    }
+    List<PlacementRule> newRules = new ArrayList<>();
+    List<Boolean> newTerminalState = new ArrayList<>();
+    NodeList elements = confElement.getChildNodes();
     for (int i = 0; i < elements.getLength(); i++) {
       Node node = elements.item(i);
-      if (node instanceof Element) {
-        QueuePlacementRule rule = createAndInitializeRule(node);
-        rules.add(rule);
+      if (node instanceof Element &&
+          node.getNodeName().equalsIgnoreCase("rule")) {
+        String name = ((Element) node).getAttribute("name");
+        LOG.debug("Creating new rule: {}", name);
+        PlacementRule rule = createRule((Element)node);
+
+        // The only child node that we currently know is a parent rule
+        PlacementRule parentRule = null;
+        String parentName = null;
+        Element child = getParentRuleElement(node);
+        if (child != null) {
+          parentName = child.getAttribute("name");
+          parentRule = getParentRule(child, fs);
+        }
+        // Need to make sure that the nestedUserQueue has a parent for
+        // backwards compatibility
+        if (name.equalsIgnoreCase("nestedUserQueue") && parentRule == null) {
+          throw new AllocationConfigurationException("Rule '" + name
+              + "' must have a parent rule set");
+        }
+        newRules.add(rule);
+        if (parentRule == null) {
+          newTerminalState.add(
+              getTerminal(RULES.get(name).terminal, rule));
+        } else {
+          ((FSPlacementRule)rule).setParentRule(parentRule);
+          newTerminalState.add(
+              getTerminal(RULES.get(name).terminal, rule) &&
+              getTerminal(RULES.get(parentName).terminal, parentRule));
+        }
+      }
+    }
+    updateRuleSet(newRules, newTerminalState, fs);
+  }
+
+  /**
+   * Find the element that defines the parent rule.
+   * @param node the xml node to check for a parent rule
+   * @return {@link Element} that describes the parent rule or
+   * <code>null</code> if none is found
+   */
+  private static Element getParentRuleElement(Node node)
+      throws AllocationConfigurationException {
+    Element parent = null;
+    // walk over the node list
+    if (node.hasChildNodes()) {
+      NodeList childList = node.getChildNodes();
+      for (int j = 0; j < childList.getLength(); j++) {
+        Node child = childList.item(j);
+        if (child instanceof Element &&
+            child.getNodeName().equalsIgnoreCase("rule")) {
+          if (parent != null) {
+            LOG.warn("Rule '{}' has multiple parent rules defined, only the " +
+                "last parent rule will be used",
+                ((Element) node).getAttribute("name"));
+          }
+          parent = ((Element) child);
+        }
+      }
+    }
+    // sanity check the rule that is configured
+    if (parent != null) {
+      String parentName = parent.getAttribute("name");
+      if (parentName.equals("reject") ||
+          parentName.equals("nestedUserQueue")) {
+        throw new AllocationConfigurationException("Rule '"
+            + parentName
+            + "' is not allowed as a parent rule for any rule");
       }
     }
-    return new QueuePlacementPolicy(rules, configuredQueues, conf);
+    return parent;
   }
-  
+
   /**
-   * Create and initialize a rule given a xml node
-   * @param node
-   * @return QueuePlacementPolicy
-   * @throws AllocationConfigurationException
+   * Retrieve the configured parent rule from the xml config.
+   * @param parent the xml element that contains the name of the rule to add.
+   * @param fs the reference to the scheduler needed in the rule on init.
+   * @return {@link PlacementRule} to set as a parent
+   * @throws AllocationConfigurationException for any error
    */
-  public static QueuePlacementRule createAndInitializeRule(Node node)
+  private static PlacementRule getParentRule(Element parent,
+                                             FairScheduler fs)
+      throws AllocationConfigurationException {
+    LOG.debug("Creating new parent rule: {}", parent.getAttribute("name"));
+    PlacementRule parentRule = createRule(parent);
+    // Init the rule, we do not want to add it to the list of the
+    // placement manager
+    try {
+      parentRule.initialize(fs);
+    } catch (IOException ioe) {
+      // We should never throw as we pass in a FS object, however we
+      // still should consider any exception here a config error.
+      throw new AllocationConfigurationException(
+          "Parent Rule initialisation failed with exception", ioe);
+    }
+    return parentRule;
+  }
+
+  /**
+   * Returns the terminal status of the rule based on the definition and the
+   * create flag set in the rule.
+   * @param terminal The definition of the terminal flag
+   * @param rule The rule to check
+   * @return <code>true</code> if the rule is terminal <code>false</code> in
+   * all other cases.
+   */
+  private static Boolean getTerminal(String terminal, PlacementRule rule) {
+    switch (terminal) {
+    case "true":    // rule is always terminal
+      return true;
+    case "false":   // rule is never terminal
+      return false;
+    default:        // rule is terminal based on the create flag
+      return ((FSPlacementRule)rule).getCreateFlag();
+    }
+  }
+
+  /**
+   * Create a rule from a given a xml node.
+   * @param element the xml element to create the rule from
+   * @return PlacementRule
+   * @throws AllocationConfigurationException for any error
+   */
+  @SuppressWarnings("unchecked")
+  private static PlacementRule createRule(Element element)
       throws AllocationConfigurationException {
-    Element element = (Element) node;
 
     String ruleName = element.getAttribute("name");
     if ("".equals(ruleName)) {
@@ -109,72 +285,54 @@ public class QueuePlacementPolicy {
           + "rule element");
     }
 
-    Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName);
-    if (clazz == null) {
+    Class<? extends PlacementRule> ruleClass = null;
+    if (RULES.containsKey(ruleName)) {
+      ruleClass = RULES.get(ruleName).ruleClass;
+    }
+    if (ruleClass == null) {
       throw new AllocationConfigurationException("No rule class found for "
           + ruleName);
     }
-    QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null);
-    rule.initializeFromXml(element);
-    return rule;
+    return getPlacementRule(ruleClass, element);
   }
     
   /**
-   * Build a simple queue placement policy from the allow-undeclared-pools and
-   * user-as-default-queue configuration options.
+   * Build a simple queue placement policy from the configuration options
+   * {@link FairSchedulerConfiguration#ALLOW_UNDECLARED_POOLS} and
+   * {@link FairSchedulerConfiguration#USER_AS_DEFAULT_QUEUE}.
+   * @param fs the reference to the scheduler needed in the rule on init.
    */
-  public static QueuePlacementPolicy fromConfiguration(Configuration conf,
-      Map<FSQueueType, Set<String>> configuredQueues) {
+  static void fromConfiguration(FairScheduler fs) {
+    LOG.debug("Creating base placement policy from config");
+    Configuration conf = fs.getConfig();
+
     boolean create = conf.getBoolean(
         FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
         FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS);
     boolean userAsDefaultQueue = conf.getBoolean(
         FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE,
         FairSchedulerConfiguration.DEFAULT_USER_AS_DEFAULT_QUEUE);
-    List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
-    rules.add(new QueuePlacementRule.Specified().initialize(create, null));
+    List<PlacementRule> newRules = new ArrayList<>();
+    List<Boolean> newTerminalState = new ArrayList<>();
+    Class<? extends PlacementRule> clazz =
+        RULES.get("specified").ruleClass;
+    newRules.add(getPlacementRule(clazz, create));
+    newTerminalState.add(false);
     if (userAsDefaultQueue) {
-      rules.add(new QueuePlacementRule.User().initialize(create, null));
+      clazz = RULES.get("user").ruleClass;
+      newRules.add(getPlacementRule(clazz, create));
+      newTerminalState.add(create);
     }
     if (!userAsDefaultQueue || !create) {
-      rules.add(new QueuePlacementRule.Default().initialize(true, null));
+      clazz = RULES.get("default").ruleClass;
+      newRules.add(getPlacementRule(clazz, true));
+      newTerminalState.add(true);
     }
     try {
-      return new QueuePlacementPolicy(rules, configuredQueues, conf);
+      updateRuleSet(newRules, newTerminalState, fs);
     } catch (AllocationConfigurationException ex) {
       throw new RuntimeException("Should never hit exception when loading" +
-      		"placement policy from conf", ex);
+          "placement policy from conf", ex);
     }
   }
-
-  /**
-   * Applies this rule to an app with the given requested queue and user/group
-   * information.
-   * 
-   * @param requestedQueue
-   *    The queue specified in the ApplicationSubmissionContext
-   * @param user
-   *    The user submitting the app
-   * @return
-   *    The name of the queue to assign the app to.  Or null if the app should
-   *    be rejected.
-   * @throws IOException
-   *    If an exception is encountered while getting the user's groups
-   */
-  public String assignAppToQueue(String requestedQueue, String user)
-      throws IOException {
-    for (QueuePlacementRule rule : rules) {
-      String queue = rule.assignAppToQueue(requestedQueue, user, groups,
-          configuredQueues);
-      if (queue == null || !queue.isEmpty()) {
-        return queue;
-      }
-    }
-    throw new IllegalStateException("Should have applied a rule before " +
-    		"reaching here");
-  }
-  
-  public List<QueuePlacementRule> getRules() {
-    return rules;
-  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
deleted file mode 100644
index 1d48a21..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.security.Groups;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.w3c.dom.Element;
-import org.w3c.dom.NamedNodeMap;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-
-import com.google.common.annotations.VisibleForTesting;
-
-@Private
-@Unstable
-public abstract class QueuePlacementRule {
-  protected boolean create;
-  public static final Logger LOG =
-      LoggerFactory.getLogger(QueuePlacementRule.class.getName());
-
-  /**
-   * Initializes the rule with any arguments.
-   * 
-   * @param args
-   *    Additional attributes of the rule's xml element other than create.
-   */
-  public QueuePlacementRule initialize(boolean create, Map<String, String> args) {
-    this.create = create;
-    return this;
-  }
-  
-  /**
-   * 
-   * @param requestedQueue
-   *    The queue explicitly requested.
-   * @param user
-   *    The user submitting the app.
-   * @param groups
-   *    The groups of the user submitting the app.
-   * @param configuredQueues
-   *    The queues specified in the scheduler configuration.
-   * @return
-   *    The queue to place the app into. An empty string indicates that we should
-   *    continue to the next rule, and null indicates that the app should be rejected.
-   */
-  public String assignAppToQueue(String requestedQueue, String user,
-      Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
-      throws IOException {
-   String queue = getQueueForApp(requestedQueue, user, groups,
-        configuredQueues);
-    if (create || configuredQueues.get(FSQueueType.LEAF).contains(queue)
-        || configuredQueues.get(FSQueueType.PARENT).contains(queue)) {
-      return queue;
-    } else {
-      return "";
-    }
-  }
-  
-  public void initializeFromXml(Element el)
-      throws AllocationConfigurationException {
-    boolean create = true;
-    NamedNodeMap attributes = el.getAttributes();
-    Map<String, String> args = new HashMap<String, String>();
-    for (int i = 0; i < attributes.getLength(); i++) {
-      Node node = attributes.item(i);
-      String key = node.getNodeName();
-      String value = node.getNodeValue();
-      if (key.equals("create")) {
-        create = Boolean.parseBoolean(value);
-      } else {
-        args.put(key, value);
-      }
-    }
-    initialize(create, args);
-  }
-  
-  /**
-   * Returns true if this rule never tells the policy to continue.
-   */
-  public abstract boolean isTerminal();
-  
-  /**
-   * Applies this rule to an app with the given requested queue and user/group
-   * information.
-   * 
-   * @param requestedQueue
-   *    The queue specified in the ApplicationSubmissionContext
-   * @param user
-   *    The user submitting the app.
-   * @param groups
-   *    The groups of the user submitting the app.
-   * @return
-   *    The name of the queue to assign the app to, or null to empty string
-   *    continue to the next rule.
-   */
-  protected abstract String getQueueForApp(String requestedQueue, String user,
-      Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
-      throws IOException;
-
-  /**
-   * Places apps in queues by username of the submitter
-   */
-  public static class User extends QueuePlacementRule {
-    @Override
-    protected String getQueueForApp(String requestedQueue, String user,
-        Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
-      return "root." + cleanName(user);
-    }
-    
-    @Override
-    public boolean isTerminal() {
-      return create;
-    }
-  }
-  
-  /**
-   * Places apps in queues by primary group of the submitter
-   */
-  public static class PrimaryGroup extends QueuePlacementRule {
-    @Override
-    protected String getQueueForApp(String requestedQueue, String user,
-        Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
-        throws IOException {
-      final List<String> groupList = groups.getGroups(user);
-      if (groupList.isEmpty()) {
-        throw new IOException("No groups returned for user " + user);
-      }
-      return "root." + cleanName(groupList.get(0));
-    }
-    
-    @Override
-    public boolean isTerminal() {
-      return create;
-    }
-  }
-  
-  /**
-   * Places apps in queues by secondary group of the submitter
-   * 
-   * Match will be made on first secondary group that exist in
-   * queues
-   */
-  public static class SecondaryGroupExistingQueue extends QueuePlacementRule {
-    @Override
-    protected String getQueueForApp(String requestedQueue, String user,
-        Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
-        throws IOException {
-      List<String> groupNames = groups.getGroups(user);
-      for (int i = 1; i < groupNames.size(); i++) {
-        String group = cleanName(groupNames.get(i));
-        if (configuredQueues.get(FSQueueType.LEAF).contains("root." + group)
-            || configuredQueues.get(FSQueueType.PARENT).contains(
-                "root." + group)) {
-          return "root." + group;
-        }
-      }
-      
-      return "";
-    }
-        
-    @Override
-    public boolean isTerminal() {
-      return false;
-    }
-  }
-
-  /**
-   * Places apps in queues with name of the submitter under the queue
-   * returned by the nested rule.
-   */
-  public static class NestedUserQueue extends QueuePlacementRule {
-    @VisibleForTesting
-    QueuePlacementRule nestedRule;
-
-    /**
-     * Parse xml and instantiate the nested rule 
-     */
-    @Override
-    public void initializeFromXml(Element el)
-        throws AllocationConfigurationException {
-      NodeList elements = el.getChildNodes();
-
-      for (int i = 0; i < elements.getLength(); i++) {
-        Node node = elements.item(i);
-        if (node instanceof Element) {
-          Element element = (Element) node;
-          if ("rule".equals(element.getTagName())) {
-            QueuePlacementRule rule = QueuePlacementPolicy
-                .createAndInitializeRule(node);
-            if (rule == null) {
-              throw new AllocationConfigurationException(
-                  "Unable to create nested rule in nestedUserQueue rule");
-            }
-            this.nestedRule = rule;
-            break;
-          } else {
-            continue;
-          }
-        }
-      }
-
-      if (this.nestedRule == null) {
-        throw new AllocationConfigurationException(
-            "No nested rule specified in <nestedUserQueue> rule");
-      }
-      super.initializeFromXml(el);
-    }
-    
-    @Override
-    protected String getQueueForApp(String requestedQueue, String user,
-        Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
-        throws IOException {
-      // Apply the nested rule
-      String queueName = nestedRule.assignAppToQueue(requestedQueue, user,
-          groups, configuredQueues);
-      
-      if (queueName != null && queueName.length() != 0) {
-        if (!queueName.startsWith("root.")) {
-          queueName = "root." + queueName;
-        }
-        
-        // Verify if the queue returned by the nested rule is an configured leaf queue,
-        // if yes then skip to next rule in the queue placement policy
-        if (configuredQueues.get(FSQueueType.LEAF).contains(queueName)) {
-          return "";
-        }
-        return queueName + "." + cleanName(user);
-      }
-      return queueName;
-    }
-
-    @Override
-    public boolean isTerminal() {
-      return false;
-    }
-  }
-  
-  /**
-   * Places apps in queues by requested queue of the submitter
-   */
-  public static class Specified extends QueuePlacementRule {
-    @Override
-    protected String getQueueForApp(String requestedQueue, String user,
-        Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
-      if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) {
-        return "";
-      } else {
-        if (!requestedQueue.startsWith("root.")) {
-          requestedQueue = "root." + requestedQueue;
-        }
-        return requestedQueue;
-      }
-    }
-    
-    @Override
-    public boolean isTerminal() {
-      return false;
-    }
-  }
-  
-  /**
-   * Places apps in the specified default queue. If no default queue is
-   * specified the app is placed in root.default queue.
-   */
-  public static class Default extends QueuePlacementRule {
-    @VisibleForTesting
-    String defaultQueueName;
-
-    @Override
-    public QueuePlacementRule initialize(boolean create,
-        Map<String, String> args) {
-      if (defaultQueueName == null) {
-        defaultQueueName = "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
-      }
-      return super.initialize(create, args);
-    }
-    
-    @Override
-    public void initializeFromXml(Element el)
-        throws AllocationConfigurationException {
-      defaultQueueName = el.getAttribute("queue");
-      if (defaultQueueName != null && !defaultQueueName.isEmpty()) {
-        if (!defaultQueueName.startsWith("root.")) {
-          defaultQueueName = "root." + defaultQueueName;
-        }
-      } else {
-        defaultQueueName = "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
-      }
-      super.initializeFromXml(el);
-    }
-
-    @Override
-    protected String getQueueForApp(String requestedQueue, String user,
-        Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
-      return defaultQueueName;
-    }
-
-    @Override
-    public boolean isTerminal() {
-      return true;
-    }
-  }
-
-  /**
-   * Rejects all apps
-   */
-  public static class Reject extends QueuePlacementRule {
-    @Override
-    public String assignAppToQueue(String requestedQueue, String user,
-        Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
-      return null;
-    }
-    
-    @Override
-    protected String getQueueForApp(String requestedQueue, String user,
-        Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
-      throw new UnsupportedOperationException();
-    }
-    
-    @Override
-    public boolean isTerminal() {
-      return true;
-    }
-  }
-
-  /**
-   * Replace the periods in the username or groupname with "_dot_" and
-   * remove trailing and leading whitespace.
-   */
-  protected String cleanName(String name) {
-    name = name.trim();
-    if (name.contains(".")) {
-      String converted = name.replaceAll("\\.", "_dot_");
-      LOG.warn("Name " + name + " is converted to " + converted
-          + " when it is used as a queue name.");
-      return converted;
-    } else {
-      return name;
-    }
-  }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java
index badf05f..c2ee8c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java
@@ -169,8 +169,7 @@ public class QueueProperties {
     private Set<String> nonPreemptableQueues = new HashSet<>();
     // Remember all queue names so we can display them on web UI, etc.
     // configuredQueues is segregated based on whether it is a leaf queue
-    // or a parent queue. This information is used for creating queues
-    // and also for making queue placement decisions(QueuePlacementRule.java).
+    // or a parent queue. This information is used for creating queues.
     private Map<FSQueueType, Set<String>> configuredQueues = new HashMap<>();
 
     Builder() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java
index e30c2a6..d0dcae0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java
@@ -18,32 +18,30 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import static junit.framework.TestCase.assertTrue;
 import static org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException.InvalidResourceType.GREATER_THEN_MAX_ALLOCATION;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.matches;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.util.HashMap;
+import java.util.Collections;
 
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.MockApps;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
 import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -51,39 +49,34 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedule
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 /**
- * Testing applications being retired from RM with fair scheduler.
- *
+ * Testing RMAppManager application submission with fair scheduler.
  */
-public class TestAppManagerWithFairScheduler extends AppManagerTestBase{
+public class TestAppManagerWithFairScheduler extends AppManagerTestBase {
 
   private static final String TEST_FOLDER = "test-queues";
 
   private static YarnConfiguration conf = new YarnConfiguration();
+  private PlacementManager placementMgr;
+  private TestRMAppManager rmAppManager;
+  private RMContext rmContext;
+  private static String allocFileName =
+      GenericTestUtils.getTestDir(TEST_FOLDER).getAbsolutePath();
 
-  @BeforeClass
-  public static void setup() throws IOException {
-    String allocFile =
-        GenericTestUtils.getTestDir(TEST_FOLDER).getAbsolutePath();
-
-    int queueMaxAllocation = 512;
-
-    PrintWriter out = new PrintWriter(new FileWriter(allocFile));
+  @Before
+  public void setup() throws IOException {
+    // Basic config with one queue (override in test if needed)
+    PrintWriter out = new PrintWriter(new FileWriter(allocFileName));
     out.println("<?xml version=\"1.0\"?>");
     out.println("<allocations>");
-    out.println(" <queue name=\"queueA\">");
-    out.println("  <maxContainerAllocation>" + queueMaxAllocation
-        + " mb 1 vcores" + "</maxContainerAllocation>");
-    out.println(" </queue>");
-    out.println(" <queue name=\"queueB\">");
+    out.println(" <queue name=\"test\">");
     out.println(" </queue>");
     out.println("</allocations>");
     out.close();
@@ -91,87 +84,205 @@ public class TestAppManagerWithFairScheduler extends AppManagerTestBase{
     conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
         ResourceScheduler.class);
 
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile);
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFileName);
+    placementMgr = mock(PlacementManager.class);
+
+    MockRM mockRM = new MockRM(conf);
+    rmContext = mockRM.getRMContext();
+    rmContext.setQueuePlacementManager(placementMgr);
+    ApplicationMasterService masterService = new ApplicationMasterService(
+        rmContext, rmContext.getScheduler());
+
+    rmAppManager = new TestRMAppManager(rmContext,
+        new ClientToAMTokenSecretManagerInRM(), rmContext.getScheduler(),
+        masterService, new ApplicationACLsManager(conf), conf);
   }
 
-  @AfterClass
-  public static void teardown(){
+  @After
+  public void teardown(){
     File allocFile = GenericTestUtils.getTestDir(TEST_FOLDER);
     allocFile.delete();
   }
 
   @Test
   public void testQueueSubmitWithHighQueueContainerSize()
-      throws YarnException {
+      throws YarnException, IOException {
+    int maxAlloc =
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB;
 
-    ApplicationId appId = MockApps.newAppID(1);
-    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
-    Resource resource = Resources.createResource(
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
-
-    ApplicationSubmissionContext asContext =
-        recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
-    asContext.setApplicationId(appId);
-    asContext.setResource(resource);
-    asContext.setPriority(Priority.newInstance(0));
-    asContext.setAMContainerSpec(mockContainerLaunchContext(recordFactory));
-    asContext.setQueue("queueA");
-    QueueInfo mockDefaultQueueInfo = mock(QueueInfo.class);
-
-    // Setup a PlacementManager returns a new queue
-    PlacementManager placementMgr = mock(PlacementManager.class);
-    doAnswer(new Answer<ApplicationPlacementContext>() {
-
-      @Override
-      public ApplicationPlacementContext answer(InvocationOnMock invocation)
-          throws Throwable {
-        return new ApplicationPlacementContext("queueA");
-      }
-
-    }).when(placementMgr).placeApplication(
-        any(ApplicationSubmissionContext.class), matches("test1"));
-    doAnswer(new Answer<ApplicationPlacementContext>() {
-
-      @Override
-      public ApplicationPlacementContext answer(InvocationOnMock invocation)
-          throws Throwable {
-        return new ApplicationPlacementContext("queueB");
-      }
-
-    }).when(placementMgr).placeApplication(
-        any(ApplicationSubmissionContext.class), matches("test2"));
-
-    MockRM newMockRM = new MockRM(conf);
-    RMContext newMockRMContext = newMockRM.getRMContext();
-    newMockRMContext.setQueuePlacementManager(placementMgr);
-    ApplicationMasterService masterService = new ApplicationMasterService(
-        newMockRMContext, newMockRMContext.getScheduler());
-
-    TestRMAppManager newAppMonitor = new TestRMAppManager(newMockRMContext,
-        new ClientToAMTokenSecretManagerInRM(), newMockRMContext.getScheduler(),
-        masterService, new ApplicationACLsManager(conf), conf);
+    // scheduler config with a limited queue
+    PrintWriter out = new PrintWriter(new FileWriter(allocFileName));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("  <queue name=\"root\">");
+    out.println("    <queue name=\"limited\">");
+    out.println("      <maxContainerAllocation>" + maxAlloc + " mb 1 vcores");
+    out.println("      </maxContainerAllocation>");
+    out.println("    </queue>");
+    out.println("    <queue name=\"unlimited\">");
+    out.println("    </queue>");
+    out.println("  </queue>");
+    out.println("</allocations>");
+    out.close();
+    rmContext.getScheduler().reinitialize(conf, rmContext);
 
-    // only user test has permission to submit to 'test' queue
+    ApplicationId appId = MockApps.newAppID(1);
+    Resource res = Resources.createResource(maxAlloc + 1);
+    ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);
 
+    // Submit to limited queue
+    when(placementMgr.placeApplication(any(), any()))
+        .thenReturn(new ApplicationPlacementContext("limited"));
     try {
-      newAppMonitor.submitApplication(asContext, "test1");
+      rmAppManager.submitApplication(asContext, "test");
       Assert.fail("Test should fail on too high allocation!");
     } catch (InvalidResourceRequestException e) {
       Assert.assertEquals(GREATER_THEN_MAX_ALLOCATION,
           e.getInvalidResourceType());
     }
 
-    // Should not throw exception
-    newAppMonitor.submitApplication(asContext, "test2");
+    // submit same app but now place it in the unlimited queue
+    when(placementMgr.placeApplication(any(), any()))
+        .thenReturn(new ApplicationPlacementContext("root.unlimited"));
+    rmAppManager.submitApplication(asContext, "test");
   }
 
-  private static ContainerLaunchContext mockContainerLaunchContext(
-      RecordFactory recordFactory) {
-    ContainerLaunchContext amContainer = recordFactory.newRecordInstance(
-        ContainerLaunchContext.class);
-    amContainer
-        .setApplicationACLs(new HashMap<ApplicationAccessType, String>());
-    return amContainer;
+  @Test
+  public void testQueueSubmitWithPermissionLimits()
+      throws YarnException, IOException {
+
+    conf.set(YarnConfiguration.YARN_ACL_ENABLE, "true");
+
+    PrintWriter out = new PrintWriter(new FileWriter(allocFileName));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("  <queue name=\"root\">");
+    out.println("    <aclSubmitApps> </aclSubmitApps>");
+    out.println("    <aclAdministerApps> </aclAdministerApps>");
+    out.println("    <queue name=\"noaccess\">");
+    out.println("    </queue>");
+    out.println("    <queue name=\"submitonly\">");
+    out.println("      <aclSubmitApps>test </aclSubmitApps>");
+    out.println("      <aclAdministerApps> </aclAdministerApps>");
+    out.println("    </queue>");
+    out.println("    <queue name=\"adminonly\">");
+    out.println("      <aclSubmitApps> </aclSubmitApps>");
+    out.println("      <aclAdministerApps>test </aclAdministerApps>");
+    out.println("    </queue>");
+    out.println("  </queue>");
+    out.println("</allocations>");
+    out.close();
+    rmContext.getScheduler().reinitialize(conf, rmContext);
+
+    ApplicationId appId = MockApps.newAppID(1);
+    Resource res = Resources.createResource(1024, 1);
+    ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);
+
+    // Submit to no access queue
+    when(placementMgr.placeApplication(any(), any()))
+        .thenReturn(new ApplicationPlacementContext("noaccess"));
+    try {
+      rmAppManager.submitApplication(asContext, "test");
+      Assert.fail("Test should have failed with access denied");
+    } catch (YarnException e) {
+      assertTrue("Access exception not found",
+          e.getCause() instanceof AccessControlException);
+    }
+    // Submit to submit access queue
+    when(placementMgr.placeApplication(any(), any()))
+        .thenReturn(new ApplicationPlacementContext("submitonly"));
+    rmAppManager.submitApplication(asContext, "test");
+    // Submit second app to admin access queue
+    appId = MockApps.newAppID(2);
+    asContext = createAppSubmitCtx(appId, res);
+    when(placementMgr.placeApplication(any(), any()))
+        .thenReturn(new ApplicationPlacementContext("adminonly"));
+    rmAppManager.submitApplication(asContext, "test");
+  }
+
+  @Test
+  public void testQueueSubmitWithRootPermission()
+      throws YarnException, IOException {
+
+    conf.set(YarnConfiguration.YARN_ACL_ENABLE, "true");
+
+    PrintWriter out = new PrintWriter(new FileWriter(allocFileName));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("  <queue name=\"root\">");
+    out.println("    <queue name=\"noaccess\">");
+    out.println("      <aclSubmitApps> </aclSubmitApps>");
+    out.println("      <aclAdministerApps> </aclAdministerApps>");
+    out.println("    </queue>");
+    out.println("  </queue>");
+    out.println("</allocations>");
+    out.close();
+    rmContext.getScheduler().reinitialize(conf, rmContext);
+
+    ApplicationId appId = MockApps.newAppID(1);
+    Resource res = Resources.createResource(1024, 1);
+    ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);
+
+    // Submit to noaccess queue should be allowed by root ACL
+    when(placementMgr.placeApplication(any(), any()))
+        .thenReturn(new ApplicationPlacementContext("noaccess"));
+    rmAppManager.submitApplication(asContext, "test");
+  }
+
+  @Test
+  public void testQueueSubmitWithAutoCreateQueue()
+      throws YarnException, IOException {
+
+    conf.set(YarnConfiguration.YARN_ACL_ENABLE, "true");
+
+    PrintWriter out = new PrintWriter(new FileWriter(allocFileName));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("  <queue name=\"root\">");
+    out.println("    <aclSubmitApps> </aclSubmitApps>");
+    out.println("    <aclAdministerApps> </aclAdministerApps>");
+    out.println("    <queue name=\"noaccess\" type=\"parent\">");
+    out.println("    </queue>");
+    out.println("    <queue name=\"submitonly\" type=\"parent\">");
+    out.println("      <aclSubmitApps>test </aclSubmitApps>");
+    out.println("    </queue>");
+    out.println("  </queue>");
+    out.println("</allocations>");
+    out.close();
+    rmContext.getScheduler().reinitialize(conf, rmContext);
+
+    ApplicationId appId = MockApps.newAppID(1);
+    Resource res = Resources.createResource(1024, 1);
+    ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);
+
+    // Submit to noaccess parent with non existent child queue
+    when(placementMgr.placeApplication(any(), any()))
+        .thenReturn(new ApplicationPlacementContext("root.noaccess.child"));
+    try {
+      rmAppManager.submitApplication(asContext, "test");
+      Assert.fail("Test should have failed with access denied");
+    } catch (YarnException e) {
+      assertTrue("Access exception not found",
+          e.getCause() instanceof AccessControlException);
+    }
+    // Submit to submitonly parent with non existent child queue
+    when(placementMgr.placeApplication(any(), any()))
+        .thenReturn(new ApplicationPlacementContext("root.submitonly.child"));
+    rmAppManager.submitApplication(asContext, "test");
+  }
+
+  private ApplicationSubmissionContext createAppSubmitCtx(ApplicationId appId,
+                                                          Resource res) {
+    ApplicationSubmissionContext asContext =
+        Records.newRecord(ApplicationSubmissionContext.class);
+    asContext.setApplicationId(appId);
+    ResourceRequest resReg =
+        ResourceRequest.newInstance(Priority.newInstance(0),
+            ResourceRequest.ANY, res, 1);
+    asContext.setAMContainerResourceRequests(
+        Collections.singletonList(resReg));
+    asContext.setAMContainerSpec(mock(ContainerLaunchContext.class));
+    asContext.setQueue("default");
+    return asContext;
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java
index 667d089..e952c50 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationACLs.java
@@ -466,7 +466,7 @@ public class TestApplicationACLs extends ParameterizedSchedulerTestBase {
     if (conf.get(YarnConfiguration.RM_SCHEDULER)
         .equals(FairScheduler.class.getName())) {
       Assert.assertTrue(appReport.getDiagnostics()
-          .contains("Application rejected by queue placement policy"));
+          .contains("user owner application rejected by placement rules."));
     } else {
       Assert.assertTrue(appReport.getDiagnostics()
           .contains("submitted by user owner to unknown queue: InvalidQueue"));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
index c6cebd9..f61d427 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -106,8 +108,14 @@ public abstract class TestSchedulerPlanFollowerBase {
     ApplicationId appId = ApplicationId.newInstance(0, 1);
     ApplicationAttemptId appAttemptId_0 =
         ApplicationAttemptId.newInstance(appId, 0);
-    AppAddedSchedulerEvent addAppEvent =
-        new AppAddedSchedulerEvent(appId, q.getQueueName(), user_0);
+    AppAddedSchedulerEvent addAppEvent;
+    if (scheduler instanceof FairScheduler) {
+      addAppEvent = new AppAddedSchedulerEvent(appId, q.getQueueName(),
+          user_0, new ApplicationPlacementContext("dedicated"));
+    } else {
+      addAppEvent = new AppAddedSchedulerEvent(appId, q.getQueueName(),
+          user_0);
+    }
     scheduler.handle(addAppEvent);
     AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
         new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
index f25df07..2e645c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
@@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMW
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -978,14 +979,17 @@ public class TestSchedulerUtils {
   }
 
   public static SchedulerApplication<SchedulerApplicationAttempt>
-  verifyAppAddedAndRemovedFromScheduler(
-          Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> applications,
+      verifyAppAddedAndRemovedFromScheduler(
+          Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>>
+              applications,
           EventHandler<SchedulerEvent> handler, String queueName) {
 
+    ApplicationPlacementContext apc =
+        new ApplicationPlacementContext(queueName);
     ApplicationId appId =
             ApplicationId.newInstance(System.currentTimeMillis(), 1);
     AppAddedSchedulerEvent appAddedEvent =
-            new AppAddedSchedulerEvent(appId, queueName, "user");
+            new AppAddedSchedulerEvent(appId, queueName, "user", apc);
     handler.handle(appAddedEvent);
     SchedulerApplication<SchedulerApplicationAttempt> app =
             applications.get(appId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 4f1f20b..f149cb0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -177,7 +178,11 @@ public class FairSchedulerTestBase {
       Collection<ResourceRequest> requests, String queueId, String userId) {
     ApplicationAttemptId id =
         createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
-    scheduler.addApplication(id.getApplicationId(), queueId, userId, false);
+    // This fakes the placement which is not part of the scheduler anymore
+    ApplicationPlacementContext placementCtx =
+        new ApplicationPlacementContext(queueId);
+    scheduler.addApplication(id.getApplicationId(), queueId, userId, false,
+        placementCtx);
     // This conditional is for testAclSubmitApplication where app is rejected
     // and no app is added.
     if (scheduler.getSchedulerApplications()
@@ -212,10 +217,15 @@ public class FairSchedulerTestBase {
       String userId, List<ResourceRequest> ask) {
     ApplicationAttemptId id = createAppAttemptId(this.APP_ID++,
         this.ATTEMPT_ID++);
-    scheduler.addApplication(id.getApplicationId(), queueId, userId, false);
+    // This fakes the placement which is not part of the scheduler anymore
+    ApplicationPlacementContext placementCtx =
+        new ApplicationPlacementContext(queueId);
+    scheduler.addApplication(id.getApplicationId(), queueId, userId, false,
+        placementCtx);
     // This conditional is for testAclSubmitApplication where app is rejected
     // and no app is added.
-    if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
+    if (scheduler.getSchedulerApplications().containsKey(
+        id.getApplicationId())) {
       scheduler.addApplicationAttempt(id, false, false);
     }
 
@@ -298,8 +308,11 @@ public class FairSchedulerTestBase {
     resourceManager.getRMContext().getRMApps().get(appId).handle(event);
     event = new RMAppEvent(appId, RMAppEventType.APP_ACCEPTED);
     resourceManager.getRMContext().getRMApps().get(appId).handle(event);
+    // This fakes the placement which is not part of the scheduler anymore
+    ApplicationPlacementContext placementCtx =
+        new ApplicationPlacementContext(queue);
     AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
-        appId, queue, user);
+        appId, queue, user, placementCtx);
     scheduler.handle(appAddedEvent);
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
index 7afc3777..a02ef55 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
@@ -25,14 +25,23 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.DefaultPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.FSPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PrimaryGroupPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.SpecifiedPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserPlacementRule;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
 import org.apache.hadoop.yarn.util.ControlledClock;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Before;
 import org.junit.Test;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -52,7 +61,12 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+/**
+ * Test loading the allocation file for the FairScheduler.
+ */
 public class TestAllocationFileLoaderService {
 
   private static final String A_CUSTOM_RESOURCE = "a-custom-resource";
@@ -64,10 +78,28 @@ public class TestAllocationFileLoaderService {
       "test-queues").getAbsolutePath();
   private static final String TEST_FAIRSCHED_XML = "test-fair-scheduler.xml";
 
+  private FairScheduler scheduler;
+  private Configuration conf;
+
+  @Before
+  public void setup() {
+    SystemClock clock = SystemClock.getInstance();
+    PlacementManager placementManager = new PlacementManager();
+    FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration();
+    RMContext rmContext = mock(RMContext.class);
+    when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
+
+    scheduler = mock(FairScheduler.class);
+    conf = new YarnConfiguration();
+    when(scheduler.getClock()).thenReturn(clock);
+    when(scheduler.getConf()).thenReturn(fsConf);
+    when(scheduler.getConfig()).thenReturn(conf);
+    when(scheduler.getRMContext()).thenReturn(rmContext);
+  }
+
   @Test
   public void testGetAllocationFileFromFileSystem()
       throws IOException, URISyntaxException {
-    Configuration conf = new YarnConfiguration();
     File baseDir =
         new File(TEST_DIR + Path.SEPARATOR + "getAllocHDFS").getAbsoluteFile();
     FileUtil.fullyDelete(baseDir);
@@ -83,7 +115,8 @@ public class TestAllocationFileLoaderService {
     fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath));
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath);
 
-    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(scheduler);
     Path allocationFile = allocLoader.getAllocationFile(conf);
     assertEquals(fsAllocPath, allocationFile.toString());
     assertTrue(fs.exists(allocationFile));
@@ -94,9 +127,9 @@ public class TestAllocationFileLoaderService {
   @Test (expected = UnsupportedFileSystemException.class)
   public void testDenyGetAllocationFileFromUnsupportedFileSystem()
       throws UnsupportedFileSystemException {
-    Configuration conf = new YarnConfiguration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, "badfs:///badfile");
-    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(scheduler);
 
     allocLoader.getAllocationFile(conf);
   }
@@ -104,12 +137,11 @@ public class TestAllocationFileLoaderService {
   @Test
   public void testGetAllocationFileFromClasspath() {
     try {
-      Configuration conf = new Configuration();
       FileSystem fs = FileSystem.get(conf);
       conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
           TEST_FAIRSCHED_XML);
       AllocationFileLoaderService allocLoader =
-          new AllocationFileLoaderService();
+          new AllocationFileLoaderService(scheduler);
       Path allocationFile = allocLoader.getAllocationFile(conf);
       assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName());
       assertTrue(fs.exists(allocationFile));
@@ -135,11 +167,10 @@ public class TestAllocationFileLoaderService {
 
     ControlledClock clock = new ControlledClock();
     clock.setTime(0);
-    Configuration conf = new Configuration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
 
     AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(
-        clock);
+        clock, scheduler);
     allocLoader.reloadIntervalMs = 5;
     allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
@@ -148,10 +179,10 @@ public class TestAllocationFileLoaderService {
     AllocationConfiguration allocConf = confHolder.allocConf;
 
     // Verify conf
-    QueuePlacementPolicy policy = allocConf.getPlacementPolicy();
-    List<QueuePlacementRule> rules = policy.getRules();
+    List<PlacementRule> rules = scheduler.getRMContext()
+        .getQueuePlacementManager().getPlacementRules();
     assertEquals(1, rules.size());
-    assertEquals(QueuePlacementRule.Default.class, rules.get(0).getClass());
+    assertEquals(DefaultPlacementRule.class, rules.get(0).getClass());
     assertEquals(1, allocConf.getQueueMaxApps("root.queueA"));
     assertEquals(2, allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
         .size());
@@ -160,6 +191,7 @@ public class TestAllocationFileLoaderService {
     assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
         .contains("root.queueB"));
 
+    // reset the conf so we can detect the reload
     confHolder.allocConf = null;
 
     // Modify file and advance the clock
@@ -174,7 +206,6 @@ public class TestAllocationFileLoaderService {
     out.println("    <rule name='nestedUserQueue' >");
     out.println("         <rule name='primaryGroup' />");
     out.println("    </rule>");
-    out.println("    <rule name='default' />");
     out.println("  </queuePlacementPolicy>");
     out.println("</allocations>");
     out.close();
@@ -189,15 +220,13 @@ public class TestAllocationFileLoaderService {
 
     // Verify conf
     allocConf = confHolder.allocConf;
-    policy = allocConf.getPlacementPolicy();
-    rules = policy.getRules();
-    assertEquals(3, rules.size());
-    assertEquals(QueuePlacementRule.Specified.class, rules.get(0).getClass());
-    assertEquals(QueuePlacementRule.NestedUserQueue.class, rules.get(1)
-        .getClass());
-    assertEquals(QueuePlacementRule.PrimaryGroup.class,
-        ((NestedUserQueue) (rules.get(1))).nestedRule.getClass());
-    assertEquals(QueuePlacementRule.Default.class, rules.get(2).getClass());
+    rules = scheduler.getRMContext().getQueuePlacementManager()
+        .getPlacementRules();
+    assertEquals(2, rules.size());
+    assertEquals(SpecifiedPlacementRule.class, rules.get(0).getClass());
+    assertEquals(UserPlacementRule.class, rules.get(1).getClass());
+    assertEquals(PrimaryGroupPlacementRule.class,
+        ((FSPlacementRule)(rules.get(1))).getParentRule().getClass());
     assertEquals(3, allocConf.getQueueMaxApps("root.queueB"));
     assertEquals(1, allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
         .size());
@@ -207,11 +236,11 @@ public class TestAllocationFileLoaderService {
 
   @Test
   public void testAllocationFileParsing() throws Exception {
-    Configuration conf = new YarnConfiguration();
     CustomResourceTypesConfigurationProvider.
         initResourceTypes(A_CUSTOM_RESOURCE);
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(scheduler);
 
     AllocationFileWriter
             .create()
@@ -455,9 +484,9 @@ public class TestAllocationFileLoaderService {
 
   @Test
   public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
-    Configuration conf = new Configuration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(scheduler);
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
@@ -569,7 +598,6 @@ public class TestAllocationFileLoaderService {
 
   @Test
   public void testSimplePlacementPolicyFromConf() throws Exception {
-    Configuration conf = new Configuration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
     conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
     conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false);
@@ -580,19 +608,20 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(scheduler);
     allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
     allocLoader.setReloadListener(confHolder);
     allocLoader.reloadAllocations();
-    AllocationConfiguration allocConf = confHolder.allocConf;
 
-    QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy();
-    List<QueuePlacementRule> rules = placementPolicy.getRules();
+    List<PlacementRule> rules = scheduler.getRMContext()
+        .getQueuePlacementManager().getPlacementRules();
     assertEquals(2, rules.size());
-    assertEquals(QueuePlacementRule.Specified.class, rules.get(0).getClass());
-    assertEquals(false, rules.get(0).create);
-    assertEquals(QueuePlacementRule.Default.class, rules.get(1).getClass());
+    assertEquals(SpecifiedPlacementRule.class, rules.get(0).getClass());
+    assertFalse("Create flag was not set to false",
+        ((FSPlacementRule)rules.get(0)).getCreateFlag());
+    assertEquals(DefaultPlacementRule.class, rules.get(1).getClass());
   }
 
   /**
@@ -601,7 +630,6 @@ public class TestAllocationFileLoaderService {
    */
   @Test (expected = AllocationConfigurationException.class)
   public void testQueueAlongsideRoot() throws Exception {
-    Configuration conf = new Configuration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@@ -614,7 +642,8 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(scheduler);
     allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
     allocLoader.setReloadListener(confHolder);
@@ -627,7 +656,6 @@ public class TestAllocationFileLoaderService {
    */
   @Test (expected = AllocationConfigurationException.class)
   public void testQueueNameContainingPeriods() throws Exception {
-    Configuration conf = new Configuration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@@ -638,7 +666,8 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(scheduler);
     allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
     allocLoader.setReloadListener(confHolder);
@@ -651,7 +680,6 @@ public class TestAllocationFileLoaderService {
    */
   @Test (expected = AllocationConfigurationException.class)
   public void testQueueNameContainingOnlyWhitespace() throws Exception {
-    Configuration conf = new Configuration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@@ -662,7 +690,8 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(scheduler);
     allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
     allocLoader.setReloadListener(confHolder);
@@ -671,7 +700,6 @@ public class TestAllocationFileLoaderService {
 
   @Test
   public void testParentTagWithReservation() throws Exception {
-    Configuration conf = new Configuration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@@ -684,7 +712,8 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(scheduler);
     allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
     allocLoader.setReloadListener(confHolder);
@@ -700,7 +729,6 @@ public class TestAllocationFileLoaderService {
 
   @Test
   public void testParentWithReservation() throws Exception {
-    Configuration conf = new Configuration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@@ -715,7 +743,8 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(scheduler);
     allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
     allocLoader.setReloadListener(confHolder);
@@ -731,7 +760,6 @@ public class TestAllocationFileLoaderService {
 
   @Test
   public void testParentTagWithChild() throws Exception {
-    Configuration conf = new Configuration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@@ -744,7 +772,8 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(scheduler);
     allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
     allocLoader.setReloadListener(confHolder);
@@ -763,7 +792,6 @@ public class TestAllocationFileLoaderService {
    */
   @Test (expected = AllocationConfigurationException.class)
   public void testQueueNameContainingNBWhitespace() throws Exception {
-    Configuration conf = new Configuration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
 
     PrintWriter out = new PrintWriter(new OutputStreamWriter(
@@ -775,7 +803,8 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(scheduler);
     allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
     allocLoader.setReloadListener(confHolder);
@@ -787,17 +816,18 @@ public class TestAllocationFileLoaderService {
    */
   @Test (expected = AllocationConfigurationException.class)
   public void testDefaultQueueSchedulingModeIsFIFO() throws Exception {
-    Configuration conf = new Configuration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
     out.println("<allocations>");
-    out.println("<defaultQueueSchedulingPolicy>fifo</defaultQueueSchedulingPolicy>");
+    out.println("<defaultQueueSchedulingPolicy>fifo" +
+        "</defaultQueueSchedulingPolicy>");
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(scheduler);
     allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
     allocLoader.setReloadListener(confHolder);
@@ -806,7 +836,6 @@ public class TestAllocationFileLoaderService {
 
   @Test
   public void testReservableQueue() throws Exception {
-    Configuration conf = new Configuration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@@ -823,7 +852,8 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(scheduler);
     allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
     allocLoader.setReloadListener(confHolder);
@@ -845,11 +875,9 @@ public class TestAllocationFileLoaderService {
     assertTrue(allocConf.getMoveOnExpiry(reservableQueueName));
     assertEquals(ReservationSchedulerConfiguration.DEFAULT_RESERVATION_WINDOW,
         allocConf.getReservationWindow(reservableQueueName));
-    assertEquals(100, allocConf.getInstantaneousMaxCapacity
-            (reservableQueueName),
-        0.0001);
-    assertEquals(
-        "DummyAgentName",
+    assertEquals(100,
+        allocConf.getInstantaneousMaxCapacity(reservableQueueName), 0.0001);
+    assertEquals("DummyAgentName",
         allocConf.getReservationAgent(reservableQueueName));
     assertEquals(100, allocConf.getAverageCapacity(reservableQueueName), 0.001);
     assertFalse(allocConf.getShowReservationAsQueues(reservableQueueName));
@@ -865,12 +893,11 @@ public class TestAllocationFileLoaderService {
 
   /**
    * Verify that you can't have dynamic user queue and reservable queue on
-   * the same queue
+   * the same queue.
    */
   @Test (expected = AllocationConfigurationException.class)
   public void testReservableCannotBeCombinedWithDynamicUserQueue()
       throws Exception {
-    Configuration conf = new Configuration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
@@ -883,7 +910,8 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(scheduler);
     allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
     allocLoader.setReloadListener(confHolder);
@@ -891,7 +919,7 @@ public class TestAllocationFileLoaderService {
   }
 
   private class ReloadListener implements AllocationFileLoaderService.Listener {
-    public AllocationConfiguration allocConf;
+    private AllocationConfiguration allocConf;
 
     @Override
     public void onReload(AllocationConfiguration info) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java
index f581935..03338f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -47,7 +48,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-/*
+/**
  * This class is to  test the fair scheduler functionality of
  * deciding the number of runnable application under various conditions.
  */
@@ -78,7 +79,7 @@ public class TestAppRunnability extends FairSchedulerTestBase {
     conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
     scheduler.reinitialize(conf, resourceManager.getRMContext());
     ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
-    createApplicationWithAMResource(appAttemptId, "default", "user1", null);
+    createApplicationWithAMResource(appAttemptId, "root.user1", "user1", null);
     assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
         .getNumRunnableApps());
     assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
@@ -110,9 +111,11 @@ public class TestAppRunnability extends FairSchedulerTestBase {
   @Test
   public void testAppAdditionAndRemoval() throws Exception {
     ApplicationAttemptId attemptId = createAppAttemptId(1, 1);
+    ApplicationPlacementContext apc =
+        new ApplicationPlacementContext("user1");
     AppAddedSchedulerEvent appAddedEvent =
-        new AppAddedSchedulerEvent(attemptId.getApplicationId(), "default",
-            "user1");
+        new AppAddedSchedulerEvent(attemptId.getApplicationId(), "user1",
+            "user1", apc);
     scheduler.handle(appAddedEvent);
     AppAttemptAddedSchedulerEvent attemptAddedEvent =
         new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), false);
@@ -149,7 +152,7 @@ public class TestAppRunnability extends FairSchedulerTestBase {
 
     // User1 submits one application
     ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
-    createApplicationWithAMResource(appAttemptId, "default", "user1", null);
+    createApplicationWithAMResource(appAttemptId, "user1", "user1", null);
 
     // The user1 queue should inherit the configurations from the root queue
     FSLeafQueue userQueue =
@@ -184,12 +187,14 @@ public class TestAppRunnability extends FairSchedulerTestBase {
     FSLeafQueue jerryQueue = queueManager.getLeafQueue("jerry", false);
     FSLeafQueue defaultQueue = queueManager.getLeafQueue("default", false);
 
+    // NOTE: placement is not inside the scheduler anymore need to fake it here.
+    // The scheduling request contains the fake placing
     // Should get put into jerry
     createSchedulingRequest(1024, "jerry", "someuser");
     assertEquals(1, jerryQueue.getNumRunnableApps());
 
     // Should get forced into default
-    createSchedulingRequest(1024, "newqueue", "someuser");
+    createSchedulingRequest(1024, "default", "someuser");
     assertEquals(1, jerryQueue.getNumRunnableApps());
     assertEquals(1, defaultQueue.getNumRunnableApps());
 
@@ -200,7 +205,7 @@ public class TestAppRunnability extends FairSchedulerTestBase {
     assertEquals(2, defaultQueue.getNumRunnableApps());
 
     // Should get put into jerry because of user-as-default-queue
-    createSchedulingRequest(1024, "default", "jerry");
+    createSchedulingRequest(1024, "jerry", "jerry");
     assertEquals(2, jerryQueue.getNumRunnableApps());
     assertEquals(2, defaultQueue.getNumRunnableApps());
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
index b6ffaa5..81500db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker;
@@ -113,14 +114,18 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
         1, Resources.createResource(4096, 4), 1, host);
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
-    NodeUpdateSchedulerEvent nodeUpdateEvent = new NodeUpdateSchedulerEvent(node1);
+    NodeUpdateSchedulerEvent nodeUpdateEvent =
+        new NodeUpdateSchedulerEvent(node1);
     scheduler.handle(nodeUpdateEvent);
 
     ApplicationAttemptId appAttemptId =
         createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
     createMockRMApp(appAttemptId);
 
-    scheduler.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false);
+    ApplicationPlacementContext placementCtx =
+        new ApplicationPlacementContext("queue11");
+    scheduler.addApplication(appAttemptId.getApplicationId(), "queue11",
+        "user11", false, placementCtx);
     scheduler.addApplicationAttempt(appAttemptId, false, false);
     List<ResourceRequest> ask = new ArrayList<>();
     ask.add(createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true));
@@ -148,7 +153,8 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
     scheduler.handle(nodeEvent2);
 
     // available resource
-    Assert.assertEquals(scheduler.getClusterResource().getMemorySize(), 16 * 1024);
+    Assert.assertEquals(scheduler.getClusterResource().getMemorySize(),
+        16 * 1024);
     Assert.assertEquals(scheduler.getClusterResource().getVirtualCores(), 16);
 
     // send application request
@@ -156,14 +162,17 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
         createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
     createMockRMApp(appAttemptId);
 
-    scheduler.addApplication(appAttemptId.getApplicationId(),
-        "queue11", "user11", false);
+    ApplicationPlacementContext placementCtx =
+        new ApplicationPlacementContext("queue11");
+    scheduler.addApplication(appAttemptId.getApplicationId(), "queue11",
+        "user11", false, placementCtx);
     scheduler.addApplicationAttempt(appAttemptId, false, false);
     List<ResourceRequest> ask = new ArrayList<>();
     ResourceRequest request =
         createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
     ask.add(request);
-    scheduler.allocate(appAttemptId, ask, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
+    scheduler.allocate(appAttemptId, ask, null, new ArrayList<>(), null, null,
+        NULL_UPDATE_REQUESTS);
     triggerSchedulingAttempt();
 
     FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
@@ -174,10 +183,11 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
         createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
     ask.clear();
     ask.add(request);
-    scheduler.allocate(appAttemptId, ask, null, new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
+    scheduler.allocate(appAttemptId, ask, null, new ArrayList<>(), null, null,
+        NULL_UPDATE_REQUESTS);
     triggerSchedulingAttempt();
 
-    checkAppConsumption(app, Resources.createResource(2048,2));
+    checkAppConsumption(app, Resources.createResource(2048, 2));
 
     // 2 containers should be assigned to 2 nodes
     Set<NodeId> nodes = new HashSet<NodeId>();
@@ -353,8 +363,10 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
     id11 = createAppAttemptId(1, 1);
     createMockRMApp(id11);
     priority = Priority.newInstance(priorityValue);
+    ApplicationPlacementContext placementCtx =
+        new ApplicationPlacementContext("root.queue1");
     scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1",
-        false);
+        false, placementCtx);
     scheduler.addApplicationAttempt(id11, false, false);
     fsAppAttempt = scheduler.getApplicationAttempt(id11);
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
index 51ffd23..3719f74 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
@@ -35,6 +35,7 @@ import static org.mockito.Mockito.spy;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -298,8 +299,10 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
     assertEquals(0, clusterUsage.getVirtualCores());
     ApplicationAttemptId id11 = createAppAttemptId(1, 1);
     createMockRMApp(id11);
+    ApplicationPlacementContext placementCtx =
+        new ApplicationPlacementContext("default");
     scheduler.addApplication(id11.getApplicationId(),
-            "default", "user1", false);
+            "default", "user1", false, placementCtx);
     scheduler.addApplicationAttempt(id11, false, false);
     assertNotNull(scheduler.getSchedulerApplications().get(id11.
             getApplicationId()));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java
index c6ff5aa..32da3a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.junit.Before;
@@ -29,23 +31,27 @@ import static org.mockito.Mockito.when;
 
 public class TestFSParentQueue {
 
-  private FairSchedulerConfiguration conf;
   private QueueManager queueManager;
 
   @Before
-  public void setUp() throws Exception {
-    conf = new FairSchedulerConfiguration();
+  public void setUp() {
+    FairSchedulerConfiguration conf = new FairSchedulerConfiguration();
+    RMContext rmContext = mock(RMContext.class);
+    SystemClock clock = SystemClock.getInstance();
+    PlacementManager placementManager = new PlacementManager();
     FairScheduler scheduler = mock(FairScheduler.class);
-    AllocationConfiguration allocConf = new AllocationConfiguration(conf);
-    when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
+    when(scheduler.getRMContext()).thenReturn(rmContext);
+    when(scheduler.getConfig()).thenReturn(conf);
     when(scheduler.getConf()).thenReturn(conf);
     when(scheduler.getResourceCalculator()).thenReturn(
         new DefaultResourceCalculator());
-    SystemClock clock = SystemClock.getInstance();
     when(scheduler.getClock()).thenReturn(clock);
+    when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
+    AllocationConfiguration allocConf = new AllocationConfiguration(scheduler);
+    when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
     queueManager = new QueueManager(scheduler);
     FSQueueMetrics.forQueue("root", null, true, conf);
-    queueManager.initialize(conf);
+    queueManager.initialize();
   }
 
   @Test
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 1a60693..e76e7e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -58,6 +58,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.DefaultPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -85,7 +88,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerEx
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -108,10 +110,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -426,8 +425,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
   private void allocateAppAttempt(String queueName, int id, int memorySize) {
     ApplicationAttemptId id11 = createAppAttemptId(id, id);
     createMockRMApp(id11);
+    ApplicationPlacementContext placementCtx =
+        new ApplicationPlacementContext(queueName);
+
     scheduler.addApplication(id11.getApplicationId(), queueName, "user1",
-        false);
+        false, placementCtx);
     scheduler.addApplicationAttempt(id11, false, false);
     List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
     ResourceRequest request1 =
@@ -1404,8 +1406,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
             createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
     createMockRMApp(attemptId);
 
+    ApplicationPlacementContext placementCtx =
+        new ApplicationPlacementContext("queue1");
     scheduler.addApplication(attemptId.getApplicationId(), "queue1", "user1",
-            false);
+            false, placementCtx);
     scheduler.addApplicationAttempt(attemptId, false, false);
     List<ResourceRequest> asks = new ArrayList<ResourceRequest>();
     asks.add(createResourceRequest(2048, node2.getRackName(), 1, 1, false));
@@ -1814,10 +1818,12 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     // only default queue
     assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
 
-    // submit app with empty queue
+    // Submit app with empty queue
+    // Submit fails before we reach the placement check.
     ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
     AppAddedSchedulerEvent appAddedEvent =
-        new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "", "user1");
+        new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "",
+            "user1");
     scheduler.handle(appAddedEvent);
 
     // submission rejected
@@ -1835,20 +1841,24 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     // only default queue
     assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
 
-    // submit app with queue name (.A)
+    // Submit app with queue name (.A)
+    // Submit fails before we reach the placement check.
     ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1);
     AppAddedSchedulerEvent appAddedEvent1 =
-        new AppAddedSchedulerEvent(appAttemptId1.getApplicationId(), ".A", "user1");
+        new AppAddedSchedulerEvent(appAttemptId1.getApplicationId(), ".A",
+            "user1");
     scheduler.handle(appAddedEvent1);
     // submission rejected
     assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
     assertNull(scheduler.getSchedulerApp(appAttemptId1));
     assertEquals(0, resourceManager.getRMContext().getRMApps().size());
 
-    // submit app with queue name (A.)
+    // Submit app with queue name (A.)
+    // Submit fails before we reach the placement check.
     ApplicationAttemptId appAttemptId2 = createAppAttemptId(2, 1);
     AppAddedSchedulerEvent appAddedEvent2 =
-        new AppAddedSchedulerEvent(appAttemptId2.getApplicationId(), "A.", "user1");
+        new AppAddedSchedulerEvent(appAttemptId2.getApplicationId(), "A.",
+            "user1");
     scheduler.handle(appAddedEvent2);
     // submission rejected
     assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
@@ -1856,9 +1866,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals(0, resourceManager.getRMContext().getRMApps().size());
 
     // submit app with queue name (A.B)
+    // Submit does not fail we must have a placement context.
     ApplicationAttemptId appAttemptId3 = createAppAttemptId(3, 1);
     AppAddedSchedulerEvent appAddedEvent3 =
-        new AppAddedSchedulerEvent(appAttemptId3.getApplicationId(), "A.B", "user1");
+        new AppAddedSchedulerEvent(appAttemptId3.getApplicationId(), "A.B",
+            "user1", new ApplicationPlacementContext("A.B"));
     scheduler.handle(appAddedEvent3);
     // submission accepted
     assertEquals(2, scheduler.getQueueManager().getLeafQueues().size());
@@ -1867,123 +1879,6 @@ public class TestFairScheduler extends FairSchedulerTestBase {
   }
 
   @Test
-  public void testAssignToQueue() throws Exception {
-    conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
-    RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
-    
-    FSLeafQueue queue1 = scheduler.assignToQueue(rmApp1, "default", "asterix");
-    FSLeafQueue queue2 = scheduler.assignToQueue(rmApp2, "notdefault", "obelix");
-    
-    // assert FSLeafQueue's name is the correct name is the one set in the RMApp
-    assertEquals(rmApp1.getQueue(), queue1.getName());
-    assertEquals("root.asterix", rmApp1.getQueue());
-    assertEquals(rmApp2.getQueue(), queue2.getName());
-    assertEquals("root.notdefault", rmApp2.getQueue());
-  }
-
-  @Test
-  public void testAssignToBadDefaultQueue() throws Exception {
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queuePlacementPolicy>");
-    out.println("<rule name=\"specified\" create=\"false\" />");
-    out.println("<rule name=\"default\" create=\"false\" />");
-    out.println("</queuePlacementPolicy>");
-    out.println("</allocations>");
-    out.close();
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
-
-    try {
-      FSLeafQueue queue1 = scheduler.assignToQueue(rmApp1, "default",
-          "asterix");
-    } catch (IllegalStateException ise) {
-      fail("Bad queue placement policy terminal rule should not throw " +
-          "exception ");
-    }
-  }
-
-  @Test
-  public void testAssignToNonLeafQueueReturnsNull() throws Exception {
-    conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    scheduler.getQueueManager().getLeafQueue("root.child1.granchild", true);
-    scheduler.getQueueManager().getLeafQueue("root.child2", true);
-
-    RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
-    RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
-
-    // Trying to assign to non leaf queue would return null
-    assertNull(scheduler.assignToQueue(rmApp1, "root.child1", "tintin"));
-    assertNotNull(scheduler.assignToQueue(rmApp2, "root.child2", "snowy"));
-  }
-  
-  @Test
-  public void testQueuePlacementWithPolicy() throws Exception {
-    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
-        SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    ApplicationAttemptId appId;
-
-    List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
-    rules.add(new QueuePlacementRule.Specified().initialize(true, null));
-    rules.add(new QueuePlacementRule.User().initialize(false, null));
-    rules.add(new QueuePlacementRule.PrimaryGroup().initialize(false, null));
-    rules.add(new QueuePlacementRule.SecondaryGroupExistingQueue().initialize(false, null));
-    rules.add(new QueuePlacementRule.Default().initialize(true, null));
-    Set<String> queues = Sets.newHashSet("root.user1", "root.user3group",
-        "root.user4subgroup1", "root.user4subgroup2" , "root.user5subgroup2");
-    Map<FSQueueType, Set<String>> configuredQueues = new HashMap<FSQueueType, Set<String>>();
-    configuredQueues.put(FSQueueType.LEAF, queues);
-    configuredQueues.put(FSQueueType.PARENT, new HashSet<String>());
-    scheduler.getAllocationConfiguration().placementPolicy =
-        new QueuePlacementPolicy(rules, configuredQueues, conf);
-    appId = createSchedulingRequest(1024, "somequeue", "user1");
-    assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName());
-    appId = createSchedulingRequest(1024, "default", "user1");
-    assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName());
-    appId = createSchedulingRequest(1024, "default", "user3");
-    assertEquals("root.user3group", scheduler.getSchedulerApp(appId).getQueueName());
-    appId = createSchedulingRequest(1024, "default", "user4");
-    assertEquals("root.user4subgroup1", scheduler.getSchedulerApp(appId).getQueueName());
-    appId = createSchedulingRequest(1024, "default", "user5");
-    assertEquals("root.user5subgroup2", scheduler.getSchedulerApp(appId).getQueueName());
-    appId = createSchedulingRequest(1024, "default", "otheruser");
-    assertEquals("root.default", scheduler.getSchedulerApp(appId).getQueueName());
-    
-    // test without specified as first rule
-    rules = new ArrayList<QueuePlacementRule>();
-    rules.add(new QueuePlacementRule.User().initialize(false, null));
-    rules.add(new QueuePlacementRule.Specified().initialize(true, null));
-    rules.add(new QueuePlacementRule.Default().initialize(true, null));
-    scheduler.getAllocationConfiguration().placementPolicy =
-        new QueuePlacementPolicy(rules, configuredQueues, conf);
-    appId = createSchedulingRequest(1024, "somequeue", "user1");
-    assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName());
-    appId = createSchedulingRequest(1024, "somequeue", "otheruser");
-    assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName());
-    appId = createSchedulingRequest(1024, "default", "otheruser");
-    assertEquals("root.default", scheduler.getSchedulerApp(appId).getQueueName());
-  }
-
-  @Test
   public void testFairShareWithMinAlloc() throws Exception {
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
 
@@ -2027,38 +1922,6 @@ public class TestFairScheduler extends FairSchedulerTestBase {
       }
     }
   }
-  
-  @Test
-  public void testNestedUserQueue() throws IOException {
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
-        SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"user1group\" type=\"parent\">");
-    out.println("<minResources>1024mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.println("<queuePlacementPolicy>");
-    out.println("<rule name=\"specified\" create=\"false\" />");
-    out.println("<rule name=\"nestedUserQueue\">");
-    out.println("     <rule name=\"primaryGroup\" create=\"false\" />");
-    out.println("</rule>");
-    out.println("<rule name=\"default\" />");
-    out.println("</queuePlacementPolicy>");
-    out.println("</allocations>");
-    out.close();
-
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-    RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
-
-    FSLeafQueue user1Leaf = scheduler.assignToQueue(rmApp1, "root.default",
-        "user1");
-
-    assertEquals("root.user1group.user1", user1Leaf.getName());
-  }
 
   @Test
   public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception {
@@ -2228,7 +2091,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // Submit one application
     ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1);
-    createApplicationWithAMResource(appAttemptId1, "default", "user1", null);
+    createApplicationWithAMResource(appAttemptId1, "user1", "user1", null);
     assertEquals(3072, scheduler.getQueueManager()
         .getLeafQueue("default", false).getSteadyFairShare().getMemorySize());
     assertEquals(3072, scheduler.getQueueManager()
@@ -2249,8 +2112,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     // First ask, queue1 requests 1 large (minReqSize * 2).
     ApplicationAttemptId id11 = createAppAttemptId(1, 1);
     createMockRMApp(id11);
+    ApplicationPlacementContext placementCtx =
+        new ApplicationPlacementContext("root.queue1");
     scheduler.addApplication(id11.getApplicationId(),
-        "root.queue1", "user1", false);
+        "root.queue1", "user1", false, placementCtx);
     scheduler.addApplicationAttempt(id11, false, false);
     List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
     ResourceRequest request1 = createResourceRequest(minReqSize * 2,
@@ -2262,8 +2127,9 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     // Second ask, queue2 requests 1 large.
     ApplicationAttemptId id21 = createAppAttemptId(2, 1);
     createMockRMApp(id21);
+    placementCtx = new ApplicationPlacementContext("root.queue2");
     scheduler.addApplication(id21.getApplicationId(),
-        "root.queue2", "user1", false);
+        "root.queue2", "user1", false, placementCtx);
     scheduler.addApplicationAttempt(id21, false, false);
     List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
     ResourceRequest request2 = createResourceRequest(2 * minReqSize,
@@ -2279,7 +2145,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     ApplicationAttemptId id22 = createAppAttemptId(2, 2);
     createMockRMApp(id22);
     scheduler.addApplication(id22.getApplicationId(),
-        "root.queue2", "user1", false);
+        "root.queue2", "user1", false, placementCtx);
     scheduler.addApplicationAttempt(id22, false, false);
     List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>();
     ResourceRequest request4 = createResourceRequest(minReqSize,
@@ -2886,8 +2752,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
     createMockRMApp(attemptId);
 
+    ApplicationPlacementContext placementCtx =
+        new ApplicationPlacementContext("queue1");
     scheduler.addApplication(attemptId.getApplicationId(), "queue1", "user1",
-        false);
+        false, placementCtx);
     scheduler.addApplicationAttempt(attemptId, false, false);
     
     // 1 request with 2 nodes on the same rack. another request with 1 node on
@@ -2900,7 +2768,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1, true));
     asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true));
 
-    scheduler.allocate(attemptId, asks, null, new ArrayList<ContainerId>(), null,
+    scheduler.allocate(attemptId, asks, null, new ArrayList<>(), null,
         null, NULL_UPDATE_REQUESTS);
     
     // node 1 checks in
@@ -3202,10 +3070,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     submissionContext.setApplicationId(applicationId);
     submissionContext.setAMContainerSpec(clc);
     RMApp application =
-        new RMAppImpl(applicationId, resourceManager.getRMContext(), conf, name, user, 
-          queue, submissionContext, scheduler, masterService,
+        new RMAppImpl(applicationId, resourceManager.getRMContext(), conf,
+            name, user, queue, submissionContext, scheduler, masterService,
           System.currentTimeMillis(), "YARN", null, null);
-    resourceManager.getRMContext().getRMApps().putIfAbsent(applicationId, application);
+    resourceManager.getRMContext().getRMApps().
+        putIfAbsent(applicationId, application);
     application.handle(new RMAppEvent(applicationId, RMAppEventType.START));
 
     final int MAX_TRIES=20;
@@ -3222,16 +3091,22 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     ApplicationAttemptId attId =
         ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++);
-    scheduler.addApplication(attId.getApplicationId(), queue, user, false);
+    ApplicationPlacementContext placementCtx =
+        new ApplicationPlacementContext(queue);
+    scheduler.addApplication(attId.getApplicationId(), queue, user, false,
+        placementCtx);
 
     numTries = 0;
     while (application.getFinishTime() == 0 && numTries < MAX_TRIES) {
       try {
         Thread.sleep(100);
-      } catch (InterruptedException ex) {ex.printStackTrace();}
+      } catch (InterruptedException ex) {
+        ex.printStackTrace();
+      }
       numTries++;
     }
-    assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus());
+    assertEquals(FinalApplicationStatus.FAILED,
+        application.getFinalApplicationStatus());
   }
 
   @Test
@@ -3845,7 +3720,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals("Queue queue1's fair share should be 0", 0, queue1
         .getFairShare().getMemorySize());
 
-    createSchedulingRequest(1 * 1024, "root.default", "user1");
+    createSchedulingRequest(1 * 1024, "default", "user1");
     scheduler.update();
     scheduler.handle(updateEvent);
 
@@ -4559,8 +4434,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     ApplicationAttemptId id11 = createAppAttemptId(1, 1);
     createMockRMApp(id11);
 
+    ApplicationPlacementContext placementCtx =
+        new ApplicationPlacementContext("root.queue1");
     scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1",
-        false);
+        false, placementCtx);
     scheduler.addApplicationAttempt(id11, false, false);
 
     List<ResourceRequest> ask1 = new ArrayList<>();
@@ -4573,7 +4450,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     String hostName = "127.0.0.1";
     RMNode node1 = MockNodes.newNodeInfo(1,
-      Resources.createResource(8 * 1024, 8), 1, hostName);
+        Resources.createResource(8 * 1024, 8), 1, hostName);
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
     scheduler.handle(nodeEvent1);
 
@@ -4611,12 +4488,12 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
-    List<QueuePlacementRule> rules =
-        scheduler.allocConf.placementPolicy.getRules();
+    List<PlacementRule> rules = scheduler.getRMContext()
+        .getQueuePlacementManager().getPlacementRules();
 
-    for (QueuePlacementRule rule : rules) {
-      if (rule instanceof Default) {
-        Default defaultRule = (Default) rule;
+    for (PlacementRule rule : rules) {
+      if (rule instanceof DefaultPlacementRule) {
+        DefaultPlacementRule defaultRule = (DefaultPlacementRule) rule;
         assertNotNull(defaultRule.defaultQueueName);
       }
     }
@@ -4686,7 +4563,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     ApplicationAttemptId appAttId2 =
         createSchedulingRequest(1024, 1, "queue1.subqueue2", "user1");
     ApplicationAttemptId appAttId3 =
-        createSchedulingRequest(1024, 1, "default", "user1");
+        createSchedulingRequest(1024, 1, "user1", "user1");
     
     List<ApplicationAttemptId> apps =
         scheduler.getAppsInQueue("queue1.subqueue1");
@@ -4888,11 +4765,13 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     ApplicationAttemptId attemptId = createAppAttemptId(1, 1);
-    // The placement rule will add the app to the user based queue but the
+    // The placement rule should add it to the user based queue but the
     // passed in queue must exist.
+    ApplicationPlacementContext apc =
+        new ApplicationPlacementContext(testUser);
     AppAddedSchedulerEvent appAddedEvent =
         new AppAddedSchedulerEvent(attemptId.getApplicationId(), testUser,
-            testUser);
+            testUser, apc);
     scheduler.handle(appAddedEvent);
     AppAttemptAddedSchedulerEvent attemptAddedEvent =
         new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), false);
@@ -4936,9 +4815,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     scheduler.reinitialize(conf, resourceManager.getRMContext());
 
     ApplicationAttemptId attemptId = createAppAttemptId(1, 1);
+    ApplicationPlacementContext apc =
+        new ApplicationPlacementContext(testUser);
     AppAddedSchedulerEvent appAddedEvent =
         new AppAddedSchedulerEvent(attemptId.getApplicationId(), testUser,
-            testUser);
+            testUser, apc);
     scheduler.handle(appAddedEvent);
     AppAttemptAddedSchedulerEvent attemptAddedEvent =
         new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), false);
@@ -4990,8 +4871,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // submit app with queue name "A"
     ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1);
+    ApplicationPlacementContext apc =
+        new ApplicationPlacementContext("A");
     AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent(
-        appAttemptId1.getApplicationId(), "A", "user1");
+        appAttemptId1.getApplicationId(), "A", "user1", apc);
     scheduler.handle(appAddedEvent1);
     // submission accepted
     assertEquals(2, scheduler.getQueueManager().getLeafQueues().size());
@@ -5008,9 +4891,15 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // submit app with queue name "A "
     ApplicationAttemptId appAttemptId2 = createAppAttemptId(2, 1);
+    apc = new ApplicationPlacementContext("A ");
     AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
-        appAttemptId2.getApplicationId(), "A ", "user1");
-    scheduler.handle(appAddedEvent2);
+        appAttemptId2.getApplicationId(), "A ", "user1", apc);
+    try {
+      scheduler.handle(appAddedEvent2);
+      Assert.fail("Submit should have failed with InvalidQueueNameException");
+    } catch (InvalidQueueNameException iqne) {
+      // expected ignore: rules should have filtered this out
+    }
     // submission rejected
     assertEquals(2, scheduler.getQueueManager().getLeafQueues().size());
     assertNull(scheduler.getSchedulerApplications().get(appAttemptId2.
@@ -5019,8 +4908,9 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // submit app with queue name "B.C"
     ApplicationAttemptId appAttemptId3 = createAppAttemptId(3, 1);
+    apc = new ApplicationPlacementContext("B.C");
     AppAddedSchedulerEvent appAddedEvent3 = new AppAddedSchedulerEvent(
-        appAttemptId3.getApplicationId(), "B.C", "user1");
+        appAttemptId3.getApplicationId(), "B.C", "user1", apc);
     scheduler.handle(appAddedEvent3);
     // submission accepted
     assertEquals(3, scheduler.getQueueManager().getLeafQueues().size());
@@ -5037,9 +4927,14 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // submit app with queue name "A\u00a0" (non-breaking space)
     ApplicationAttemptId appAttemptId4 = createAppAttemptId(4, 1);
+    apc = new ApplicationPlacementContext("A\u00a0");
     AppAddedSchedulerEvent appAddedEvent4 = new AppAddedSchedulerEvent(
-        appAttemptId4.getApplicationId(), "A\u00a0", "user1");
-    scheduler.handle(appAddedEvent4);
+        appAttemptId4.getApplicationId(), "A\u00a0", "user1", apc);
+    try {
+      scheduler.handle(appAddedEvent4);
+    } catch (InvalidQueueNameException iqne) {
+      // expected ignore: rules should have filtered this out
+    }
     // submission rejected
     assertEquals(3, scheduler.getQueueManager().getLeafQueues().size());
     assertNull(scheduler.getSchedulerApplications().get(appAttemptId4.
@@ -5075,17 +4970,17 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     scheduler.init(conf);
     scheduler.start();
     scheduler.reinitialize(conf, resourceManager.getRMContext());
-    ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
-    createApplicationWithAMResource(appAttemptId, "default", "  user1", null);
+    ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
+    createApplicationWithAMResource(attId1, "root.user1", "  user1", null);
     assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
         .getNumRunnableApps());
     assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
         .getNumRunnableApps());
     assertEquals("root.user1", resourceManager.getRMContext().getRMApps()
-        .get(appAttemptId.getApplicationId()).getQueue());
+        .get(attId1.getApplicationId()).getQueue());
 
     ApplicationAttemptId attId2 = createAppAttemptId(2, 1);
-    createApplicationWithAMResource(attId2, "default", "user1  ", null);
+    createApplicationWithAMResource(attId2, "root.user1", "user1  ", null);
     assertEquals(2, scheduler.getQueueManager().getLeafQueue("user1", true)
         .getNumRunnableApps());
     assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
@@ -5094,7 +4989,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         .get(attId2.getApplicationId()).getQueue());
 
     ApplicationAttemptId attId3 = createAppAttemptId(3, 1);
-    createApplicationWithAMResource(attId3, "default", "user1", null);
+    createApplicationWithAMResource(attId3, "root.user1", "user1", null);
     assertEquals(3, scheduler.getQueueManager().getLeafQueue("user1", true)
         .getNumRunnableApps());
     assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
@@ -5526,8 +5421,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     // Create application attempt
     ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
     createMockRMApp(appAttemptId);
+    ApplicationPlacementContext placementCtx =
+        new ApplicationPlacementContext("root.queue1");
     scheduler.addApplication(appAttemptId.getApplicationId(), "root.queue1",
-        "user1", false);
+        "user1", false, placementCtx);
     scheduler.addApplicationAttempt(appAttemptId, false, false);
 
     // Create container request that goes to a specific node.
@@ -5612,7 +5509,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     ResourceRequest amReqs = ResourceRequest.newBuilder()
         .capability(Resource.newInstance(5 * GB, 3)).build();
-    createApplicationWithAMResource(appAttemptId1, "queueA", "user1",
+    createApplicationWithAMResource(appAttemptId1, "root.queueA", "user1",
         Resource.newInstance(GB, 1), Lists.newArrayList(amReqs));
     scheduler.update();
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerWithMultiResourceTypes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerWithMultiResourceTypes.java
index f9fcf53..2785baa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerWithMultiResourceTypes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerWithMultiResourceTypes.java
@@ -21,6 +21,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -33,6 +35,8 @@ import java.util.Map;
 import static org.apache.hadoop.yarn.util.resource.ResourceUtils.MAXIMUM_ALLOCATION;
 import static org.apache.hadoop.yarn.util.resource.ResourceUtils.UNITS;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestFairSchedulerWithMultiResourceTypes
     extends FairSchedulerTestBase {
@@ -44,6 +48,11 @@ public class TestFairSchedulerWithMultiResourceTypes
     scheduler = new FairScheduler();
     conf = createConfiguration();
     initResourceTypes(conf);
+    // since this runs outside of the normal context we need to set one
+    RMContext rmContext = mock(RMContext.class);
+    PlacementManager placementManager = new PlacementManager();
+    when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
+    scheduler.setRMContext(rmContext);
   }
 
   @After
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
index 964ecf5..c692349 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
@@ -27,10 +27,10 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.junit.Before;
@@ -46,26 +46,27 @@ public class TestMaxRunningAppsEnforcer {
   private FairScheduler scheduler;
   
   @Before
-  public void setup() throws Exception {
-    Configuration conf = new Configuration();
+  public void setup() {
+    FairSchedulerConfiguration conf = new FairSchedulerConfiguration();
+    PlacementManager placementManager = new PlacementManager();
+    rmContext = mock(RMContext.class);
+    when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
+    when(rmContext.getEpoch()).thenReturn(0L);
     clock = new ControlledClock();
     scheduler = mock(FairScheduler.class);
-    when(scheduler.getConf()).thenReturn(
-        new FairSchedulerConfiguration(conf));
+    when(scheduler.getConf()).thenReturn(conf);
+    when(scheduler.getConfig()).thenReturn(conf);
     when(scheduler.getClock()).thenReturn(clock);
-    AllocationConfiguration allocConf = new AllocationConfiguration(
-        conf);
-    when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
     when(scheduler.getResourceCalculator()).thenReturn(
         new DefaultResourceCalculator());
-
+    when(scheduler.getRMContext()).thenReturn(rmContext);
+    AllocationConfiguration allocConf = new AllocationConfiguration(scheduler);
+    when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
     queueManager = new QueueManager(scheduler);
-    queueManager.initialize(conf);
+    queueManager.initialize();
     userMaxApps = allocConf.userMaxApps;
     maxAppsEnforcer = new MaxRunningAppsEnforcer(scheduler);
     appNum = 0;
-    rmContext = mock(RMContext.class);
-    when(rmContext.getEpoch()).thenReturn(0L);
   }
   
   private FSAppAttempt addApp(FSLeafQueue queue, String user) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
index 9d8c960..d888d23 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -37,35 +38,43 @@ import org.mockito.Mockito;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
+/**
+ * Test the {@link FairScheduler} queue manager correct queue hierarchies
+ * management (create, delete and type changes).
+ */
 public class TestQueueManager {
-  private FairSchedulerConfiguration conf;
   private QueueManager queueManager;
   private FairScheduler scheduler;
-  
+
   @Before
-  public void setUp() throws Exception {
-    conf = new FairSchedulerConfiguration();
+  public void setUp() {
+    PlacementManager placementManager = new PlacementManager();
+    FairSchedulerConfiguration conf = new FairSchedulerConfiguration();
+    RMContext rmContext = mock(RMContext.class);
+    when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
+    SystemClock clock = SystemClock.getInstance();
+
     scheduler = mock(FairScheduler.class);
+    when(scheduler.getConf()).thenReturn(conf);
+    when(scheduler.getConfig()).thenReturn(conf);
+    when(scheduler.getRMContext()).thenReturn(rmContext);
+    when(scheduler.getResourceCalculator()).thenReturn(
+        new DefaultResourceCalculator());
+    when(scheduler.getClock()).thenReturn(clock);
 
-    AllocationConfiguration allocConf = new AllocationConfiguration(conf);
+    AllocationConfiguration allocConf =
+        new AllocationConfiguration(scheduler);
+    when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
 
     // Set up some queues to test default child max resource inheritance
     allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.test");
     allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.test.childA");
     allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.test.childB");
 
-    when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
-    when(scheduler.getConf()).thenReturn(conf);
-    when(scheduler.getResourceCalculator()).thenReturn(
-        new DefaultResourceCalculator());
-
-    SystemClock clock = SystemClock.getInstance();
-
-    when(scheduler.getClock()).thenReturn(clock);
     queueManager = new QueueManager(scheduler);
 
     FSQueueMetrics.forQueue("root", null, true, conf);
-    queueManager.initialize(conf);
+    queueManager.initialize();
     queueManager.updateAllocationConfiguration(allocConf);
   }
 
@@ -118,7 +127,8 @@ public class TestQueueManager {
    */
   @Test
   public void testReloadTurnsLeafToParentWithNoLeaf() {
-    AllocationConfiguration allocConf = new AllocationConfiguration(conf);
+    AllocationConfiguration allocConf =
+        new AllocationConfiguration(scheduler);
     // Create a leaf queue1
     allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.queue1");
     queueManager.updateAllocationConfiguration(allocConf);
@@ -130,7 +140,7 @@ public class TestQueueManager {
     FSLeafQueue q1 = queueManager.getLeafQueue("queue1", false);
     ApplicationId appId = ApplicationId.newInstance(0, 0);
     q1.addAssignedApp(appId);
-    allocConf = new AllocationConfiguration(conf);
+    allocConf = new AllocationConfiguration(scheduler);
     allocConf.configuredQueues.get(FSQueueType.PARENT)
         .add("root.queue1");
 
@@ -169,7 +179,8 @@ public class TestQueueManager {
 
   private void updateConfiguredLeafQueues(QueueManager queueMgr,
                                           String... confLeafQueues) {
-    AllocationConfiguration allocConf = new AllocationConfiguration(conf);
+    AllocationConfiguration allocConf =
+        new AllocationConfiguration(scheduler);
     allocConf.configuredQueues.get(FSQueueType.LEAF)
         .addAll(Sets.newHashSet(confLeafQueues));
     queueMgr.updateAllocationConfiguration(allocConf);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java
index 3fe9ce3..29c9d70 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java
@@ -18,44 +18,82 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.nio.charset.StandardCharsets;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.security.GroupMappingServiceProvider;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.DefaultPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.FSPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserPlacementRule;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 
+/**
+ * Tests for the queue placement policy for the {@link FairScheduler}.
+ */
 public class TestQueuePlacementPolicy {
-  private final static Configuration conf = new Configuration();
-  private Map<FSQueueType, Set<String>> configuredQueues;
-  
+  private final static FairSchedulerConfiguration CONF =
+      new FairSchedulerConfiguration();
+  // Base setup needed, policy is an intermediate object
+  private PlacementManager placementManager;
+  private FairScheduler scheduler;
+  private QueueManager queueManager;
+
+  // Locals used in each assignment
+  private ApplicationSubmissionContext asc;
+  private ApplicationPlacementContext context;
+
   @BeforeClass
   public static void setup() {
-    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+    CONF.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
         SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
   }
-  
+
   @Before
   public void initTest() {
-    configuredQueues = new HashMap<FSQueueType, Set<String>>();
-    for (FSQueueType type : FSQueueType.values()) {
-      configuredQueues.put(type, new HashSet<String>());
-    }
+    SystemClock clock = SystemClock.getInstance();
+    RMContext rmContext = mock(RMContext.class);
+    placementManager = new PlacementManager();
+    scheduler = mock(FairScheduler.class);
+    when(scheduler.getClock()).thenReturn(clock);
+    when(scheduler.getRMContext()).thenReturn(rmContext);
+    when(scheduler.getConfig()).thenReturn(CONF);
+    when(scheduler.getConf()).thenReturn(CONF);
+    when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
+    AllocationConfiguration allocConf = new AllocationConfiguration(scheduler);
+    when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
+    queueManager = new QueueManager(scheduler);
+    queueManager.initialize();
+    when(scheduler.getQueueManager()).thenReturn(queueManager);
+  }
+
+  @After
+  public void cleanTest() {
+    placementManager = null;
+    queueManager = null;
+    scheduler = null;
   }
 
   @Test
@@ -65,15 +103,18 @@ public class TestQueuePlacementPolicy {
     sb.append("  <rule name='specified' />");
     sb.append("  <rule name='user' />");
     sb.append("</queuePlacementPolicy>");
-    QueuePlacementPolicy policy = parse(sb.toString());
-    assertEquals("root.specifiedq",
-        policy.assignAppToQueue("specifiedq", "someuser"));
-    assertEquals("root.someuser",
-        policy.assignAppToQueue("default", "someuser"));
-    assertEquals("root.otheruser",
-        policy.assignAppToQueue("default", "otheruser"));
+    createPolicy(sb.toString());
+
+    asc = newAppSubmissionContext("specifiedq");
+    context = placementManager.placeApplication(asc, "someuser");
+    assertEquals("root.specifiedq", context.getQueue());
+    asc = newAppSubmissionContext("default");
+    context = placementManager.placeApplication(asc, "someuser");
+    assertEquals("root.someuser", context.getQueue());
+    context = placementManager.placeApplication(asc, "otheruser");
+    assertEquals("root.otheruser", context.getQueue());
   }
-  
+
   @Test
   public void testNoCreate() throws Exception {
     StringBuffer sb = new StringBuffer();
@@ -82,15 +123,24 @@ public class TestQueuePlacementPolicy {
     sb.append("  <rule name='user' create=\"false\" />");
     sb.append("  <rule name='default' />");
     sb.append("</queuePlacementPolicy>");
-    
-    configuredQueues.get(FSQueueType.LEAF).add("root.someuser");
-    QueuePlacementPolicy policy = parse(sb.toString());
-    assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser"));
-    assertEquals("root.someuser", policy.assignAppToQueue("default", "someuser"));
-    assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "otheruser"));
-    assertEquals("root.default", policy.assignAppToQueue("default", "otheruser"));
+    createPolicy(sb.toString());
+
+    createQueue(FSQueueType.LEAF, "root.someuser");
+
+    asc = newAppSubmissionContext("specifiedq");
+    context = placementManager.placeApplication(asc, "someuser");
+    assertEquals("root.specifiedq", context.getQueue());
+    asc = newAppSubmissionContext("default");
+    context = placementManager.placeApplication(asc, "someuser");
+    assertEquals("root.someuser", context.getQueue());
+    asc = newAppSubmissionContext("specifiedq");
+    context = placementManager.placeApplication(asc, "otheruser");
+    assertEquals("root.specifiedq", context.getQueue());
+    asc = newAppSubmissionContext("default");
+    context = placementManager.placeApplication(asc, "otheruser");
+    assertEquals("root.default", context.getQueue());
   }
-  
+
   @Test
   public void testSpecifiedThenReject() throws Exception {
     StringBuffer sb = new StringBuffer();
@@ -98,94 +148,173 @@ public class TestQueuePlacementPolicy {
     sb.append("  <rule name='specified' />");
     sb.append("  <rule name='reject' />");
     sb.append("</queuePlacementPolicy>");
-    QueuePlacementPolicy policy = parse(sb.toString());
-    assertEquals("root.specifiedq",
-        policy.assignAppToQueue("specifiedq", "someuser"));
-    assertEquals(null, policy.assignAppToQueue("default", "someuser"));
+    createPolicy(sb.toString());
+    asc = newAppSubmissionContext("specifiedq");
+    context = placementManager.placeApplication(asc, "someuser");
+    assertEquals("root.specifiedq", context.getQueue());
+    asc = newAppSubmissionContext("default");
+    context = placementManager.placeApplication(asc, "someuser");
+    assertNull("Assignment should have been rejected and was not", context);
   }
-  
-  @Test (expected = AllocationConfigurationException.class)
-  public void testOmittedTerminalRule() throws Exception {
+
+  @Test
+  public void testOmittedTerminalRule()  {
     StringBuffer sb = new StringBuffer();
     sb.append("<queuePlacementPolicy>");
     sb.append("  <rule name='specified' />");
     sb.append("  <rule name='user' create=\"false\" />");
     sb.append("</queuePlacementPolicy>");
-    parse(sb.toString());
+    assertIfExceptionThrown(sb);
   }
-  
-  @Test (expected = AllocationConfigurationException.class)
-  public void testTerminalRuleInMiddle() throws Exception {
+
+  @Test
+  public void testTerminalRuleInMiddle() {
     StringBuffer sb = new StringBuffer();
     sb.append("<queuePlacementPolicy>");
     sb.append("  <rule name='specified' />");
     sb.append("  <rule name='default' />");
     sb.append("  <rule name='user' />");
     sb.append("</queuePlacementPolicy>");
-    parse(sb.toString());
+    assertIfExceptionThrown(sb);
   }
-  
+
   @Test
-  public void testTerminals() throws Exception {
-    // Should make it through without an exception
+  public void testTerminals() {
+    // The default rule is no longer considered terminal when the create flag
+    // is false. The throw now happens when configuring not when assigning the
+    // application
     StringBuffer sb = new StringBuffer();
     sb.append("<queuePlacementPolicy>");
     sb.append("  <rule name='secondaryGroupExistingQueue' create='true'/>");
     sb.append("  <rule name='default' queue='otherdefault' create='false'/>");
     sb.append("</queuePlacementPolicy>");
-    QueuePlacementPolicy policy = parse(sb.toString());
-    try {
-      policy.assignAppToQueue("root.otherdefault", "user1");
-      fail("Expect exception from having default rule with create=\'false\'");
-    } catch (IllegalStateException se) {
-    }
+    assertIfExceptionThrown(sb);
   }
-  
+
   @Test
   public void testDefaultRuleWithQueueAttribute() throws Exception {
     // This test covers the use case where we would like default rule
     // to point to a different queue by default rather than root.default
-    configuredQueues.get(FSQueueType.LEAF).add("root.someDefaultQueue");
+    createQueue(FSQueueType.LEAF, "root.someDefaultQueue");
     StringBuffer sb = new StringBuffer();
     sb.append("<queuePlacementPolicy>");
     sb.append("  <rule name='specified' create='false' />");
     sb.append("  <rule name='default' queue='root.someDefaultQueue'/>");
     sb.append("</queuePlacementPolicy>");
 
-    QueuePlacementPolicy policy = parse(sb.toString());
-    assertEquals("root.someDefaultQueue",
-        policy.assignAppToQueue("root.default", "user1"));
+    createPolicy(sb.toString());
+    asc = newAppSubmissionContext("default");
+    context = placementManager.placeApplication(asc, "user1");
+    assertEquals("root.someDefaultQueue", context.getQueue());
   }
-  
+
   @Test
   public void testNestedUserQueueParsingErrors() {
     // No nested rule specified in hierarchical user queue
     StringBuffer sb = new StringBuffer();
     sb.append("<queuePlacementPolicy>");
-    sb.append("  <rule name='specified' />");
     sb.append("  <rule name='nestedUserQueue'/>");
-    sb.append("  <rule name='default' />");
     sb.append("</queuePlacementPolicy>");
 
     assertIfExceptionThrown(sb);
 
-    // Specified nested rule is not a QueuePlacementRule
+    // Specified nested rule is not a FSPlacementRule
     sb = new StringBuffer();
     sb.append("<queuePlacementPolicy>");
-    sb.append("  <rule name='specified' />");
     sb.append("  <rule name='nestedUserQueue'>");
-    sb.append("       <rule name='unknownRule'/>");
+    sb.append("    <rule name='unknownRule'/>");
     sb.append("  </rule>");
-    sb.append("  <rule name='default' />");
     sb.append("</queuePlacementPolicy>");
 
     assertIfExceptionThrown(sb);
+
+    // Parent rule is rule that cannot be one: reject or nestedUserQueue
+    sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <rule name='nestedUserQueue'>");
+    sb.append("    <rule name='reject'/>");
+    sb.append("  </rule>");
+    sb.append("</queuePlacementPolicy>");
+
+    assertIfExceptionThrown(sb);
+
+    // If the parent rule does not have the create flag the nested rule is not
+    // terminal
+    sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <rule name='nestedUserQueue'>");
+    sb.append("    <rule name='primaryGroup' create='false'/>");
+    sb.append("  </rule>");
+    sb.append("</queuePlacementPolicy>");
+
+    assertIfExceptionThrown(sb);
+  }
+
+  @Test
+  public void testMultipleParentRules() throws Exception {
+    StringBuffer sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <rule name='nestedUserQueue'>");
+    sb.append("    <rule name='primaryGroup'/>");
+    sb.append("    <rule name='default'/>");
+    sb.append("  </rule>");
+    sb.append("</queuePlacementPolicy>");
+
+    createPolicy(sb.toString());
+    PlacementRule nested = placementManager.getPlacementRules().get(0);
+    if (nested instanceof UserPlacementRule) {
+      PlacementRule parent = ((FSPlacementRule)nested).getParentRule();
+      assertTrue("Nested rule should have been Default rule",
+          parent instanceof DefaultPlacementRule);
+    } else {
+      fail("Policy parsing failed: rule with multiple parents not set");
+    }
+  }
+
+  @Test
+  public void testBrokenRules() throws Exception {
+    // broken rule should fail configuring
+    StringBuffer sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <rule />");
+    sb.append("</queuePlacementPolicy>");
+
+    assertIfExceptionThrown(sb);
+
+    // policy without rules ignoring policy
+    sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <notarule />");
+    sb.append("</queuePlacementPolicy>");
+
+    createPolicy(sb.toString());
+
+    // broken rule should fail configuring
+    sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <rule name='user'>");
+    sb.append("    <rule />");
+    sb.append("  </rule>");
+    sb.append("</queuePlacementPolicy>");
+
+    assertIfExceptionThrown(sb);
+
+    // parent rule not set to something known: no parent rule is required
+    // required case is only for nestedUserQueue tested earlier
+    sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <rule name='user'>");
+    sb.append("    <notarule />");
+    sb.append("  </rule>");
+    sb.append("</queuePlacementPolicy>");
+
+    createPolicy(sb.toString());
   }
 
   private void assertIfExceptionThrown(StringBuffer sb) {
     Throwable th = null;
     try {
-      parse(sb.toString());
+      createPolicy(sb.toString());
     } catch (Exception e) {
       th = e;
     }
@@ -193,6 +322,17 @@ public class TestQueuePlacementPolicy {
     assertTrue(th instanceof AllocationConfigurationException);
   }
 
+  private void assertIfExceptionThrown(String user) {
+    Throwable th = null;
+    try {
+      placementManager.placeApplication(asc, user);
+    } catch (Exception e) {
+      th = e;
+    }
+
+    assertTrue(th instanceof YarnException);
+  }
+
   @Test
   public void testNestedUserQueueParsing() throws Exception {
     StringBuffer sb = new StringBuffer();
@@ -201,17 +341,9 @@ public class TestQueuePlacementPolicy {
     sb.append("  <rule name='nestedUserQueue'>");
     sb.append("       <rule name='primaryGroup'/>");
     sb.append("  </rule>");
-    sb.append("  <rule name='default' />");
     sb.append("</queuePlacementPolicy>");
 
-    Throwable th = null;
-    try {
-      parse(sb.toString());
-    } catch (Exception e) {
-      th = e;
-    }
-
-    assertNull(th);
+    createPolicy(sb.toString());
   }
 
   @Test
@@ -222,24 +354,26 @@ public class TestQueuePlacementPolicy {
     sb.append("  <rule name='nestedUserQueue'>");
     sb.append("       <rule name='primaryGroup'/>");
     sb.append("  </rule>");
-    sb.append("  <rule name='default' />");
     sb.append("</queuePlacementPolicy>");
 
     // User queue would be created under primary group queue
-    QueuePlacementPolicy policy = parse(sb.toString());
-    assertEquals("root.user1group.user1",
-        policy.assignAppToQueue("root.default", "user1"));
+    createPolicy(sb.toString());
+    asc = newAppSubmissionContext("default");
+    context = placementManager.placeApplication(asc, "user1");
+    assertEquals("root.user1group.user1", context.getQueue());
     // Other rules above and below hierarchical user queue rule should work as
     // usual
-    configuredQueues.get(FSQueueType.LEAF).add("root.specifiedq");
+    createQueue(FSQueueType.LEAF, "root.specifiedq");
     // test if specified rule(above nestedUserQueue rule) works ok
-    assertEquals("root.specifiedq",
-        policy.assignAppToQueue("root.specifiedq", "user2"));
-
-    // test if default rule(below nestedUserQueue rule) works
-    configuredQueues.get(FSQueueType.LEAF).add("root.user3group");
-    assertEquals("root.default",
-        policy.assignAppToQueue("root.default", "user3"));
+    asc = newAppSubmissionContext("root.specifiedq");
+    context = placementManager.placeApplication(asc, "user2");
+    assertEquals("root.specifiedq", context.getQueue());
+
+    // Submit should fail if we cannot create the queue
+    createQueue(FSQueueType.LEAF, "root.user3group");
+    asc = newAppSubmissionContext("default");
+    context = placementManager.placeApplication(asc, "user3");
+    assertNull("Submission should have failed and did not", context);
   }
 
   @Test
@@ -253,18 +387,18 @@ public class TestQueuePlacementPolicy {
     sb.append("  <rule name='default' />");
     sb.append("</queuePlacementPolicy>");
 
-    QueuePlacementPolicy policy = parse(sb.toString());
+    createPolicy(sb.toString());
 
     // Should return root.default since primary group 'root.user1group' is not
     // configured
-    assertEquals("root.default",
-        policy.assignAppToQueue("root.default", "user1"));
+    asc = newAppSubmissionContext("default");
+    context = placementManager.placeApplication(asc, "user1");
+    assertEquals("root.default", context.getQueue());
 
     // Let's configure primary group and check if user queue is created
-    configuredQueues.get(FSQueueType.PARENT).add("root.user1group");
-    policy = parse(sb.toString());
-    assertEquals("root.user1group.user1",
-        policy.assignAppToQueue("root.default", "user1"));
+    createQueue(FSQueueType.PARENT, "root.user1group");
+    context = placementManager.placeApplication(asc, "user1");
+    assertEquals("root.user1group.user1", context.getQueue());
 
     // Both Primary group and nestedUserQueue rule has create='false'
     sb = new StringBuffer();
@@ -277,16 +411,16 @@ public class TestQueuePlacementPolicy {
 
     // Should return root.default since primary group and user queue for user 2
     // are not configured.
-    assertEquals("root.default",
-        policy.assignAppToQueue("root.default", "user2"));
+    asc = newAppSubmissionContext("default");
+    context = placementManager.placeApplication(asc, "user2");
+    assertEquals("root.default", context.getQueue());
 
     // Now configure both primary group and the user queue for user2
-    configuredQueues.get(FSQueueType.PARENT).add("root.user2group");
-    configuredQueues.get(FSQueueType.LEAF).add("root.user2group.user2");
-    policy = parse(sb.toString());
+    createQueue(FSQueueType.LEAF, "root.user2group.user2");
 
-    assertEquals("root.user2group.user2",
-        policy.assignAppToQueue("root.default", "user2"));
+    // Try placing the same app again
+    context = placementManager.placeApplication(asc, "user2");
+    assertEquals("root.user2group.user2", context.getQueue());
   }
 
   @Test
@@ -299,17 +433,18 @@ public class TestQueuePlacementPolicy {
     sb.append("  <rule name='default' />");
     sb.append("</queuePlacementPolicy>");
 
-    QueuePlacementPolicy policy = parse(sb.toString());
+    createPolicy(sb.toString());
     // Should return root.default since secondary groups are not configured
-    assertEquals("root.default",
-        policy.assignAppToQueue("root.default", "user1"));
+    asc = newAppSubmissionContext("default");
+    context = placementManager.placeApplication(asc, "user1");
+    assertEquals("root.default", context.getQueue());
 
     // configure secondary group for user1
-    configuredQueues.get(FSQueueType.PARENT).add("root.user1subgroup1");
-    policy = parse(sb.toString());
+    createQueue(FSQueueType.PARENT, "root.user1subgroup1");
+    createPolicy(sb.toString());
     // user queue created should be created under secondary group
-    assertEquals("root.user1subgroup1.user1",
-        policy.assignAppToQueue("root.default", "user1"));
+    context = placementManager.placeApplication(asc, "user1");
+    assertEquals("root.user1subgroup1.user1", context.getQueue());
   }
 
   @Test
@@ -325,33 +460,66 @@ public class TestQueuePlacementPolicy {
     sb.append("</queuePlacementPolicy>");
 
     // Let's create couple of parent queues
-    configuredQueues.get(FSQueueType.PARENT).add("root.parent1");
-    configuredQueues.get(FSQueueType.PARENT).add("root.parent2");
-
-    QueuePlacementPolicy policy = parse(sb.toString());
-    assertEquals("root.parent1.user1",
-        policy.assignAppToQueue("root.parent1", "user1"));
-    assertEquals("root.parent2.user2",
-        policy.assignAppToQueue("root.parent2", "user2"));
+    createQueue(FSQueueType.PARENT, "root.parent1");
+    createQueue(FSQueueType.PARENT, "root.parent2");
+
+    createPolicy(sb.toString());
+    asc = newAppSubmissionContext("root.parent1");
+    context = placementManager.placeApplication(asc, "user1");
+    assertEquals("root.parent1.user1", context.getQueue());
+    asc = newAppSubmissionContext("root.parent2");
+    context = placementManager.placeApplication(asc, "user2");
+    assertEquals("root.parent2.user2", context.getQueue());
   }
-  
+
   @Test
   public void testNestedUserQueueDefaultRule() throws Exception {
     // This test covers the use case where we would like user queues to be
     // created under a default parent queue
-    configuredQueues.get(FSQueueType.PARENT).add("root.parentq");
     StringBuffer sb = new StringBuffer();
     sb.append("<queuePlacementPolicy>");
     sb.append("  <rule name='specified' create='false' />");
     sb.append("  <rule name='nestedUserQueue'>");
-    sb.append("       <rule name='default' queue='root.parentq'/>");
+    sb.append("       <rule name='default' queue='root.parent'/>");
+    sb.append("  </rule>");
+    sb.append("</queuePlacementPolicy>");
+
+    createPolicy(sb.toString());
+    asc = newAppSubmissionContext("default");
+    context = placementManager.placeApplication(asc, "user1");
+    assertEquals("root.parent.user1", context.getQueue());
+
+    // Same as above but now with the create flag false for the parent
+    createQueue(FSQueueType.PARENT, "root.parent");
+    sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <rule name='specified' create='false' />");
+    sb.append("  <rule name='nestedUserQueue'>");
+    sb.append("    <rule name='default' queue='root.parent' create='false'/>");
     sb.append("  </rule>");
     sb.append("  <rule name='default' />");
     sb.append("</queuePlacementPolicy>");
 
-    QueuePlacementPolicy policy = parse(sb.toString());
-    assertEquals("root.parentq.user1",
-        policy.assignAppToQueue("root.default", "user1"));
+    createPolicy(sb.toString());
+    asc = newAppSubmissionContext("default");
+    context = placementManager.placeApplication(asc, "user1");
+    assertEquals("root.parent.user1", context.getQueue());
+
+    // Parent queue returned is already a configured LEAF, should fail and the
+    // context is null.
+    createQueue(FSQueueType.LEAF, "root.parent");
+    sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <rule name='specified' create='false' />");
+    sb.append("  <rule name='nestedUserQueue'>");
+    sb.append("       <rule name='default' queue='root.parent' />");
+    sb.append("  </rule>");
+    sb.append("</queuePlacementPolicy>");
+
+    createPolicy(sb.toString());
+    asc = newAppSubmissionContext("default");
+    context = placementManager.placeApplication(asc, "user1");
+    assertNull("Submission should have failed and did not", context);
   }
 
   @Test
@@ -361,9 +529,10 @@ public class TestQueuePlacementPolicy {
     sb.append("<queuePlacementPolicy>");
     sb.append("  <rule name='user' />");
     sb.append("</queuePlacementPolicy>");
-    QueuePlacementPolicy policy = parse(sb.toString());
-    assertEquals("root.first_dot_last",
-        policy.assignAppToQueue("default", "first.last"));
+    createPolicy(sb.toString());
+    asc = newAppSubmissionContext("default");
+    context = placementManager.placeApplication(asc, "first.last");
+    assertEquals("root.first_dot_last", context.getQueue());
 
     sb = new StringBuffer();
     sb.append("<queuePlacementPolicy>");
@@ -371,11 +540,14 @@ public class TestQueuePlacementPolicy {
     sb.append("  <rule name='nestedUserQueue'>");
     sb.append("       <rule name='default'/>");
     sb.append("  </rule>");
-    sb.append("  <rule name='default' />");
     sb.append("</queuePlacementPolicy>");
-    policy = parse(sb.toString());
-    assertEquals("root.default.first_dot_last",
-        policy.assignAppToQueue("root.default", "first.last"));
+    // specified create is false, bypass the rule
+    // default rule has create which requires a PARENT queue: remove the LEAF
+    queueManager.removeLeafQueue("root.default");
+    createPolicy(sb.toString());
+    asc = newAppSubmissionContext("default");
+    context = placementManager.placeApplication(asc, "first_dot_last");
+    assertEquals("root.default.first_dot_last", context.getQueue());
   }
 
   @Test
@@ -386,22 +558,22 @@ public class TestQueuePlacementPolicy {
     sb.append("  <rule name='nestedUserQueue'>");
     sb.append("       <rule name='primaryGroup'/>");
     sb.append("  </rule>");
-    sb.append("  <rule name='default' />");
     sb.append("</queuePlacementPolicy>");
 
-    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+    CONF.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
         PeriodGroupsMapping.class, GroupMappingServiceProvider.class);
     // User queue would be created under primary group queue, and the period
     // in the group name should be converted into _dot_
-    QueuePlacementPolicy policy = parse(sb.toString());
-    assertEquals("root.user1_dot_group.user1",
-        policy.assignAppToQueue("root.default", "user1"));
+    createPolicy(sb.toString());
+    asc = newAppSubmissionContext("default");
+    context = placementManager.placeApplication(asc, "user1");
+    assertEquals("root.user1_dot_group.user1", context.getQueue());
 
-    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+    CONF.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
         SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
   }
 
-  @Test(expected=IOException.class)
+  @Test
   public void testEmptyGroupsPrimaryGroupRule() throws Exception {
     StringBuffer sb = new StringBuffer();
     sb.append("<queuePlacementPolicy>");
@@ -410,20 +582,67 @@ public class TestQueuePlacementPolicy {
     sb.append("</queuePlacementPolicy>");
 
     // Add a static mapping that returns empty groups for users
-    conf.setStrings(CommonConfigurationKeys
+    CONF.setStrings(CommonConfigurationKeys
         .HADOOP_USER_GROUP_STATIC_OVERRIDES, "emptygroupuser=");
-    QueuePlacementPolicy policy = parse(sb.toString());
-    policy.assignAppToQueue(null, "emptygroupuser");
+    createPolicy(sb.toString());
+    asc = newAppSubmissionContext("root.fake");
+    assertIfExceptionThrown("emptygroupuser");
+  }
+
+  @Test
+  public void testSpecifiedQueueWithSpaces() throws Exception {
+    StringBuffer sb = new StringBuffer();
+    sb.append("<queuePlacementPolicy>");
+    sb.append("  <rule name='specified'/>");
+    sb.append("  <rule name='default'/>");
+    sb.append("</queuePlacementPolicy>");
+
+    createPolicy(sb.toString());
+    asc = newAppSubmissionContext("A ");
+    assertIfExceptionThrown("user1");
+
+    asc = newAppSubmissionContext("A\u00a0");
+    assertIfExceptionThrown("user1");
   }
 
-  private QueuePlacementPolicy parse(String str) throws Exception {
+  private void createPolicy(String str)
+      throws AllocationConfigurationException {
     // Read and parse the allocations file.
-    DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
-        .newInstance();
-    docBuilderFactory.setIgnoringComments(true);
-    DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
-    Document doc = builder.parse(IOUtils.toInputStream(str));
-    Element root = doc.getDocumentElement();
-    return QueuePlacementPolicy.fromXml(root, configuredQueues, conf);
+    Element root = null;
+    try {
+      DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
+          .newInstance();
+      docBuilderFactory.setIgnoringComments(true);
+      DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+      Document doc = builder.parse(IOUtils.toInputStream(str,
+          StandardCharsets.UTF_8));
+      root = doc.getDocumentElement();
+    } catch (Exception ex) {
+      // Don't really want to test the xml parsing side,
+      // let it fail with a null config below.
+    }
+    QueuePlacementPolicy.fromXml(root, scheduler);
+  }
+
+  private ApplicationSubmissionContext newAppSubmissionContext(String queue) {
+    ApplicationId appId = ApplicationId.newInstance(1L, 1);
+    Priority prio = Priority.UNDEFINED;
+    Resource resource = Resource.newInstance(1, 1);
+    ContainerLaunchContext amContainer =
+        ContainerLaunchContext.newInstance(null, null, null, null, null, null);
+    return ApplicationSubmissionContext.newInstance(appId, "test", queue,
+        prio, amContainer, false, false, 1, resource, "testing");
+  }
+
+  private void createQueue(FSQueueType type, String name) {
+    // Create a queue as if it is in the config.
+    FSQueue queue = queueManager.createQueue(name, type);
+    assertNotNull("Queue not created", queue);
+    // walk up the list till we have a non dynamic queue
+    // root is always non dynamic
+    do {
+      queue.setDynamic(false);
+      queue = queue.parent;
+    } while (queue.isDynamic());
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
index 9c6b0f4..e2be0d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
@@ -30,6 +30,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
@@ -40,6 +42,8 @@ import org.junit.Test;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestSchedulingPolicy {
   private static final Logger LOG =
@@ -53,6 +57,11 @@ public class TestSchedulingPolicy {
   public void setUp() throws Exception {
     scheduler = new FairScheduler();
     conf = new FairSchedulerConfiguration();
+    // since this runs outside of the normal context we need to set one
+    RMContext rmContext = mock(RMContext.class);
+    PlacementManager placementManager = new PlacementManager();
+    when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
+    scheduler.setRMContext(rmContext);
   }
 
   public void testParseSchedulingPolicy()
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/TestFairSchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/TestFairSchedulerQueueInfo.java
index 83b7e41..454609b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/TestFairSchedulerQueueInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/TestFairSchedulerQueueInfo.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
 
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
@@ -37,19 +39,25 @@ import static org.mockito.Mockito.when;
 public class TestFairSchedulerQueueInfo {
 
   @Test
-  public void testEmptyChildQueues() throws Exception {
-    FairSchedulerConfiguration conf = new FairSchedulerConfiguration();
+  public void testEmptyChildQueues() {
+    FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration();
+    RMContext rmContext = mock(RMContext.class);
+    PlacementManager placementManager = new PlacementManager();
+    SystemClock clock = SystemClock.getInstance();
     FairScheduler scheduler = mock(FairScheduler.class);
-    AllocationConfiguration allocConf = new AllocationConfiguration(conf);
-    when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
-    when(scheduler.getConf()).thenReturn(conf);
-    when(scheduler.getClusterResource()).thenReturn(Resource.newInstance(1, 1));
+    when(scheduler.getConf()).thenReturn(fsConf);
+    when(scheduler.getConfig()).thenReturn(fsConf);
+    when(scheduler.getRMContext()).thenReturn(rmContext);
+    when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
+    when(scheduler.getClusterResource()).thenReturn(
+        Resource.newInstance(1, 1));
     when(scheduler.getResourceCalculator()).thenReturn(
         new DefaultResourceCalculator());
-    SystemClock clock = SystemClock.getInstance();
     when(scheduler.getClock()).thenReturn(clock);
+    AllocationConfiguration allocConf = new AllocationConfiguration(scheduler);
+    when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
     QueueManager queueManager = new QueueManager(scheduler);
-    queueManager.initialize(conf);
+    queueManager.initialize();
 
     FSQueue testQueue = queueManager.getLeafQueue("test", true);
     FairSchedulerQueueInfo queueInfo =


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org