You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/15 20:02:08 UTC

[02/19] hadoop git commit: YARN-3635. Refactored current queue mapping implementation in CapacityScheduler to use a generic PlacementManager framework. Contributed by Wangda Tan

YARN-3635. Refactored current queue mapping implementation in CapacityScheduler to use a generic PlacementManager framework. Contributed by Wangda Tan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5468baa8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5468baa8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5468baa8

Branch: refs/heads/YARN-1197
Commit: 5468baa80aa2a3e2a02e9a902deebafd734daf23
Parents: d777757
Author: Jian He <ji...@apache.org>
Authored: Tue Sep 15 15:39:20 2015 +0800
Committer: Jian He <ji...@apache.org>
Committed: Tue Sep 15 15:39:20 2015 +0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../resourcemanager/RMActiveServiceContext.java |  16 +-
 .../server/resourcemanager/RMAppManager.java    |   9 +
 .../yarn/server/resourcemanager/RMContext.java  |   5 +
 .../server/resourcemanager/RMContextImpl.java   |  12 +-
 .../placement/PlacementManager.java             |  95 +++++++++
 .../placement/PlacementRule.java                |  55 +++++
 .../UserGroupMappingPlacementRule.java          | 164 +++++++++++++++
 .../scheduler/capacity/CapacityScheduler.java   | 126 ++++--------
 .../CapacitySchedulerConfiguration.java         |  32 +--
 .../server/resourcemanager/TestAppManager.java  |  54 ++++-
 .../TestUserGroupMappingPlacementRule.java      |  89 ++++++++
 .../scheduler/capacity/TestQueueMappings.java   | 203 +++++--------------
 13 files changed, 584 insertions(+), 279 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5468baa8/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 766d4ef..cff5205 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -455,6 +455,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend
     container allocation logic. (Wangda Tan via jianhe)
 
+    YARN-3635. Refactored current queue mapping implementation in CapacityScheduler
+    to use a generic PlacementManager framework. (Wangda Tan via jianhe)
+
   BUG FIXES
 
     YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5468baa8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 1abb14e..c71323f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@@ -99,9 +100,10 @@ public class RMActiveServiceContext {
   private long schedulerRecoveryWaitTime = 0;
   private boolean printLog = true;
   private boolean isSchedulerReady = false;
+  private PlacementManager queuePlacementManager = null;
 
   public RMActiveServiceContext() {
-
+    queuePlacementManager = new PlacementManager();
   }
 
   @Private
@@ -424,4 +426,16 @@ public class RMActiveServiceContext {
   public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
     return systemCredentials;
   }
+  
+  @Private
+  @Unstable
+  public PlacementManager getQueuePlacementManager() {
+    return queuePlacementManager;
+  }
+  
+  @Private
+  @Unstable
+  public void setQueuePlacementManager(PlacementManager placementMgr) {
+    this.queuePlacementManager = placementMgr;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5468baa8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 6fd1838..703ec1e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -326,6 +326,15 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
   private RMAppImpl createAndPopulateNewRMApp(
       ApplicationSubmissionContext submissionContext, long submitTime,
       String user, boolean isRecovery) throws YarnException {
+    // Do queue mapping
+    if (!isRecovery) {
+      if (rmContext.getQueuePlacementManager() != null) {
+        // We only do queue mapping when it's a new application
+        rmContext.getQueuePlacementManager().placeApplication(
+            submissionContext, user);
+      }
+    }
+    
     ApplicationId applicationId = submissionContext.getApplicationId();
     ResourceRequest amReq =
         validateAndCreateResourceRequest(submissionContext, isRecovery);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5468baa8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index bc50268..b64c834 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -124,4 +125,8 @@ public interface RMContext {
   boolean isSchedulerReadyForAllocatingContainers();
   
   Configuration getYarnConfiguration();
+  
+  PlacementManager getQueuePlacementManager();
+  
+  void setQueuePlacementManager(PlacementManager placementMgr);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5468baa8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index d6d573d..840cea7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -76,7 +77,6 @@ public class RMContextImpl implements RMContext {
    * individual fields.
    */
   public RMContextImpl() {
-
   }
 
   @VisibleForTesting
@@ -438,4 +438,14 @@ public class RMContextImpl implements RMContext {
   public void setYarnConfiguration(Configuration yarnConfiguration) {
     this.yarnConfiguration=yarnConfiguration;
   }
+
+  @Override
+  public PlacementManager getQueuePlacementManager() {
+    return this.activeServiceContext.getQueuePlacementManager();
+  }
+  
+  @Override
+  public void setQueuePlacementManager(PlacementManager placementMgr) {
+    this.activeServiceContext.setQueuePlacementManager(placementMgr);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5468baa8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
new file mode 100644
index 0000000..43a4deb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.placement;
+
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class PlacementManager {  
+  private static final Log LOG = LogFactory.getLog(PlacementManager.class);
+
+  List<PlacementRule> rules;
+  ReadLock readLock;
+  WriteLock writeLock;
+
+  public PlacementManager() {
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
+  }
+
+  public void updateRules(List<PlacementRule> rules) {
+    try {
+      writeLock.lock();
+      this.rules = rules;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public void placeApplication(ApplicationSubmissionContext asc, String user)
+      throws YarnException {
+    try {
+      readLock.lock();
+      if (null == rules || rules.isEmpty()) {
+        return;
+      }
+      
+      String newQueueName = null;
+      for (PlacementRule rule : rules) {
+        newQueueName = rule.getQueueForApp(asc, user);
+        if (newQueueName != null) {
+          break;
+        }
+      }
+      
+      // Failed to get where to place application
+      if (null == newQueueName && null == asc.getQueue()) {
+        String msg = "Failed to get where to place application="
+            + asc.getApplicationId();
+        LOG.error(msg);
+        throw new YarnException(msg);
+      }
+      
+      // Set it to ApplicationSubmissionContext
+      if (!StringUtils.equals(asc.getQueue(), newQueueName)) {
+        LOG.info("Placed application=" + asc.getApplicationId() + " to queue="
+            + newQueueName + ", original queue=" + asc.getQueue());
+        asc.setQueue(newQueueName);
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  @VisibleForTesting
+  public List<PlacementRule> getPlacementRules() {
+    return rules;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5468baa8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java
new file mode 100644
index 0000000..47dc48a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.placement;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+
+public abstract class PlacementRule {
+  public String getName() {
+    return this.getClass().getName();
+  }
+
+  public void initialize(Map<String, String> parameters, RMContext rmContext)
+      throws YarnException {
+  }
+
+  /**
+   * Get queue for a given application
+   * 
+   * @param asc application submission context
+   * @param user userName
+   * 
+   * @throws YarnException
+   *           if any error happens
+   * 
+   * @return <p>
+   *         non-null value means it is determined
+   *         </p>
+   *         <p>
+   *         null value means it is undetermined, so next {@link PlacementRule}
+   *         in the {@link PlacementManager} will take care
+   *         </p>
+   */
+  public abstract String getQueueForApp(ApplicationSubmissionContext asc,
+      String user) throws YarnException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5468baa8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
new file mode 100644
index 0000000..d617d16
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.placement;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class UserGroupMappingPlacementRule extends PlacementRule {
+  private static final Log LOG = LogFactory
+      .getLog(UserGroupMappingPlacementRule.class);
+
+  public static final String CURRENT_USER_MAPPING = "%user";
+
+  public static final String PRIMARY_GROUP_MAPPING = "%primary_group";
+
+  private boolean overrideWithQueueMappings = false;
+  private List<QueueMapping> mappings = null;
+  private Groups groups;
+
+  @Private
+  public static class QueueMapping {
+
+    public enum MappingType {
+
+      USER("u"), GROUP("g");
+      private final String type;
+
+      private MappingType(String type) {
+        this.type = type;
+      }
+
+      public String toString() {
+        return type;
+      }
+
+    };
+
+    MappingType type;
+    String source;
+    String queue;
+
+    public QueueMapping(MappingType type, String source, String queue) {
+      this.type = type;
+      this.source = source;
+      this.queue = queue;
+    }
+    
+    public String getQueue() {
+      return queue;
+    }
+    
+    @Override
+    public int hashCode() {
+      return super.hashCode();
+    }
+    
+    @Override
+    public boolean equals(Object obj) {
+      if (obj instanceof QueueMapping) {
+        QueueMapping other = (QueueMapping) obj;
+        return (other.type.equals(type) && 
+            other.source.equals(source) && 
+            other.queue.equals(queue));
+      } else {
+        return false;
+      }
+    }
+  }
+
+  public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings,
+      List<QueueMapping> newMappings, Groups groups) {
+    this.mappings = newMappings;
+    this.overrideWithQueueMappings = overrideWithQueueMappings;
+    this.groups = groups;
+  }
+
+  private String getMappedQueue(String user) throws IOException {
+    for (QueueMapping mapping : mappings) {
+      if (mapping.type == MappingType.USER) {
+        if (mapping.source.equals(CURRENT_USER_MAPPING)) {
+          if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
+            return user;
+          } else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
+            return groups.getGroups(user).get(0);
+          } else {
+            return mapping.queue;
+          }
+        }
+        if (user.equals(mapping.source)) {
+          return mapping.queue;
+        }
+      }
+      if (mapping.type == MappingType.GROUP) {
+        for (String userGroups : groups.getGroups(user)) {
+          if (userGroups.equals(mapping.source)) {
+            return mapping.queue;
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public String getQueueForApp(ApplicationSubmissionContext asc, String user)
+      throws YarnException {
+    String queueName = asc.getQueue();
+    ApplicationId applicationId = asc.getApplicationId();
+    if (mappings != null && mappings.size() > 0) {
+      try {
+        String mappedQueue = getMappedQueue(user);
+        if (mappedQueue != null) {
+          // We have a mapping, should we use it?
+          if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
+              || overrideWithQueueMappings) {
+            LOG.info("Application " + applicationId + " user " + user
+                + " mapping [" + queueName + "] to [" + mappedQueue
+                + "] override " + overrideWithQueueMappings);
+            return mappedQueue;
+          }
+        }
+      } catch (IOException ioex) {
+        String message = "Failed to submit application " + applicationId +
+            " submitted by user " + user + " reason: " + ioex.getMessage();
+        throw new YarnException(message);
+      }
+    }
+    
+    return queueName;
+  }
+  
+  @VisibleForTesting
+  public List<QueueMapping> getQueueMappings() {
+    return mappings;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5468baa8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index dbaccaf..ad5c76c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -69,6 +69,9 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
@@ -98,8 +101,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -228,16 +229,6 @@ public class CapacityScheduler extends
       CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
           + ".scheduling-interval-ms";
   private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
-  
-  private boolean overrideWithQueueMappings = false;
-  private List<QueueMapping> mappings = null;
-  private Groups groups;
-
-  @VisibleForTesting
-  public synchronized String getMappedQueueForTest(String user)
-      throws IOException {
-    return getMappedQueue(user);
-  }
 
   public CapacityScheduler() {
     super(CapacityScheduler.class.getName());
@@ -447,29 +438,52 @@ public class CapacityScheduler extends
   }
   private static final QueueHook noop = new QueueHook();
 
-  private void initializeQueueMappings() throws IOException {
-    overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
+  @VisibleForTesting
+  public synchronized UserGroupMappingPlacementRule
+      getUserGroupMappingPlacementRule() throws IOException {
+    boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
     LOG.info("Initialized queue mappings, override: "
         + overrideWithQueueMappings);
+
     // Get new user/group mappings
-    List<QueueMapping> newMappings = conf.getQueueMappings();
-    //check if mappings refer to valid queues
+    List<UserGroupMappingPlacementRule.QueueMapping> newMappings =
+        conf.getQueueMappings();
+    // check if mappings refer to valid queues
     for (QueueMapping mapping : newMappings) {
-      if (!mapping.queue.equals(CURRENT_USER_MAPPING) &&
-          !mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
-        CSQueue queue = queues.get(mapping.queue);
+      String mappingQueue = mapping.getQueue();
+      if (!mappingQueue
+          .equals(UserGroupMappingPlacementRule.CURRENT_USER_MAPPING)
+          && !mappingQueue
+              .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
+        CSQueue queue = queues.get(mappingQueue);
         if (queue == null || !(queue instanceof LeafQueue)) {
-          throw new IOException(
-              "mapping contains invalid or non-leaf queue " + mapping.queue);
+          throw new IOException("mapping contains invalid or non-leaf queue "
+              + mappingQueue);
         }
       }
     }
-    //apply the new mappings since they are valid
-    mappings = newMappings;
+
     // initialize groups if mappings are present
-    if (mappings.size() > 0) {
-      groups = new Groups(conf);
+    if (newMappings.size() > 0) {
+      Groups groups = new Groups(conf);
+      return new UserGroupMappingPlacementRule(overrideWithQueueMappings,
+          newMappings, groups);
     }
+
+    return null;
+  }
+
+  private void updatePlacementRules() throws IOException {
+    List<PlacementRule> placementRules = new ArrayList<>();
+    
+    // Initialize UserGroupMappingPlacementRule
+    // TODO, need make this defineable by configuration.
+    UserGroupMappingPlacementRule ugRule = getUserGroupMappingPlacementRule();
+    if (null != ugRule) {
+      placementRules.add(ugRule);
+    }
+    
+    rmContext.getQueuePlacementManager().updateRules(placementRules);
   }
 
   @Lock(CapacityScheduler.class)
@@ -481,7 +495,7 @@ public class CapacityScheduler extends
             queues, queues, noop);
     labelManager.reinitializeQueueLabels(getQueueToLabels());
     LOG.info("Initialized root queue " + root);
-    initializeQueueMappings();
+    updatePlacementRules();
     setQueueAcls(authorizer, queues);
   }
 
@@ -502,7 +516,7 @@ public class CapacityScheduler extends
     
     // Re-configure queues
     root.reinitialize(newRoot, clusterResource);
-    initializeQueueMappings();
+    updatePlacementRules();
 
     // Re-calculate headroom for active applications
     root.updateClusterResource(clusterResource, new ResourceLimits(
@@ -647,66 +661,8 @@ public class CapacityScheduler extends
     return queues.get(queueName);
   }
 
-  private static final String CURRENT_USER_MAPPING = "%user";
-
-  private static final String PRIMARY_GROUP_MAPPING = "%primary_group";
-
-  private String getMappedQueue(String user) throws IOException {
-    for (QueueMapping mapping : mappings) {
-      if (mapping.type == MappingType.USER) {
-        if (mapping.source.equals(CURRENT_USER_MAPPING)) {
-          if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
-            return user;
-          }
-          else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
-            return groups.getGroups(user).get(0);
-          }
-          else {
-            return mapping.queue;
-          }
-        }
-        if (user.equals(mapping.source)) {
-          return mapping.queue;
-        }
-      }
-      if (mapping.type == MappingType.GROUP) {
-        for (String userGroups : groups.getGroups(user)) {
-          if (userGroups.equals(mapping.source)) {
-            return mapping.queue;
-          }
-        }
-      }
-    }
-    return null;
-  }
-
   private synchronized void addApplication(ApplicationId applicationId,
       String queueName, String user, boolean isAppRecovering, Priority priority) {
-
-    if (mappings != null && mappings.size() > 0) {
-      try {
-        String mappedQueue = getMappedQueue(user);
-        if (mappedQueue != null) {
-          // We have a mapping, should we use it?
-          if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
-              || overrideWithQueueMappings) {
-            LOG.info("Application " + applicationId + " user " + user
-                + " mapping [" + queueName + "] to [" + mappedQueue
-                + "] override " + overrideWithQueueMappings);
-            queueName = mappedQueue;
-            RMApp rmApp = rmContext.getRMApps().get(applicationId);
-            rmApp.setQueue(queueName);
-          }
-        }
-      } catch (IOException ioex) {
-        String message = "Failed to submit application " + applicationId +
-            " submitted by user " + user + " reason: " + ioex.getMessage();
-        this.rmContext.getDispatcher().getEventHandler()
-            .handle(new RMAppRejectedEvent(applicationId, message));
-        return;
-      }
-    }
-
     // sanity checks.
     CSQueue queue = getQueue(queueName);
     if (queue == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5468baa8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index be5e6dd..b1461c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.security.AccessType;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
@@ -211,35 +212,6 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
 
   @Private
   public static final Integer DEFAULT_CONFIGURATION_APPLICATION_PRIORITY = 0;
-
-  @Private
-  public static class QueueMapping {
-
-    public enum MappingType {
-
-      USER("u"),
-      GROUP("g");
-      private final String type;
-      private MappingType(String type) {
-        this.type = type;
-      }
-
-      public String toString() {
-        return type;
-      }
-
-    };
-
-    MappingType type;
-    String source;
-    String queue;
-
-    public QueueMapping(MappingType type, String source, String queue) {
-      this.type = type;
-      this.source = source;
-      this.queue = queue;
-    }
-  }
   
   @Private
   public static final String AVERAGE_CAPACITY = "average-capacity";
@@ -747,7 +719,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
    */
   public List<QueueMapping> getQueueMappings() {
     List<QueueMapping> mappings =
-        new ArrayList<CapacitySchedulerConfiguration.QueueMapping>();
+        new ArrayList<QueueMapping>();
     Collection<String> mappingsString =
         getTrimmedStringCollection(QUEUE_MAPPING);
     for (String mappingValue : mappingsString) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5468baa8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index cbeae5b..c435692 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -19,16 +19,10 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
-
-import static org.mockito.Matchers.isA;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -40,8 +34,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 
-import org.junit.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -57,11 +54,14 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
@@ -73,8 +73,11 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -658,6 +661,39 @@ public class TestAppManager{
     Assert.assertTrue(msg.contains("preemptedResources=<memory:1234\\, vCores:56>"));
     Assert.assertTrue(msg.contains("applicationType=MAPREDUCE"));
  }
+  
+  @Test
+  public void testRMAppSubmitWithQueueChanged() throws Exception {
+    // Setup a PlacementManager returns a new queue
+    PlacementManager placementMgr = mock(PlacementManager.class);
+    doAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        ApplicationSubmissionContext ctx =
+            (ApplicationSubmissionContext) invocation.getArguments()[0];
+        ctx.setQueue("newQueue");
+        return null;
+      }
+      
+    }).when(placementMgr).placeApplication(any(ApplicationSubmissionContext.class),
+            any(String.class));
+    rmContext.setQueuePlacementManager(placementMgr);
+    
+    asContext.setQueue("oldQueue");
+    appMonitor.submitApplication(asContext, "test");
+    RMApp app = rmContext.getRMApps().get(appId);
+    Assert.assertNotNull("app is null", app);
+    Assert.assertEquals("newQueue", asContext.getQueue());
+
+    // wait for event to be processed
+    int timeoutSecs = 0;
+    while ((getAppEventType() == RMAppEventType.KILL) && timeoutSecs++ < 20) {
+      Thread.sleep(1000);
+    }
+    Assert.assertEquals("app event type sent is wrong", RMAppEventType.START,
+        getAppEventType());
+  }
 
   private static ResourceScheduler mockResourceScheduler() {
     ResourceScheduler scheduler = mock(ResourceScheduler.class);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5468baa8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java
new file mode 100644
index 0000000..61bc8d9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.placement;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.security.GroupMappingServiceProvider;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestUserGroupMappingPlacementRule {
+  YarnConfiguration conf = new YarnConfiguration();
+
+  @Before
+  public void setup() {
+    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+        SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+  }
+
+  private void verifyQueueMapping(QueueMapping queueMapping, String inputUser,
+      String expectedQueue) throws YarnException {
+    verifyQueueMapping(queueMapping, inputUser,
+        YarnConfiguration.DEFAULT_QUEUE_NAME, expectedQueue, false);
+  }
+
+  private void verifyQueueMapping(QueueMapping queueMapping, String inputUser,
+      String inputQueue, String expectedQueue, boolean overwrite) throws YarnException {
+    Groups groups = new Groups(conf);
+    UserGroupMappingPlacementRule rule =
+        new UserGroupMappingPlacementRule(overwrite, Arrays.asList(queueMapping),
+            groups);
+    ApplicationSubmissionContext asc =
+        Records.newRecord(ApplicationSubmissionContext.class);
+    asc.setQueue(inputQueue);
+    String queue = rule.getQueueForApp(asc, inputUser);
+    Assert.assertEquals(expectedQueue, queue);
+  }
+
+  @Test
+  public void testMapping() throws YarnException {
+    // simple base case for mapping user to queue
+    verifyQueueMapping(new QueueMapping(MappingType.USER, "a", "q1"), "a", "q1");
+    verifyQueueMapping(new QueueMapping(MappingType.GROUP, "agroup", "q1"),
+        "a", "q1");
+    verifyQueueMapping(new QueueMapping(MappingType.USER, "%user", "q2"), "a",
+        "q2");
+    verifyQueueMapping(new QueueMapping(MappingType.USER, "%user", "%user"),
+        "a", "a");
+    verifyQueueMapping(new QueueMapping(MappingType.USER, "%user",
+        "%primary_group"), "a", "agroup");
+    verifyQueueMapping(new QueueMapping(MappingType.GROUP, "asubgroup1", "q1"),
+        "a", "q1");
+    
+    // specify overwritten, and see if user specified a queue, and it will be
+    // overridden
+    verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"),
+        "user", "q2", "q1", true);
+    
+    // if overwritten not specified, it should be which user specified
+    verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"),
+        "user", "q2", "q2", false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5468baa8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
index 005f40b..1df6b4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
@@ -18,22 +18,16 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
-import java.util.HashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.security.GroupMappingServiceProvider;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
-import org.junit.After;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestQueueMappings {
@@ -47,15 +41,23 @@ public class TestQueueMappings {
       CapacitySchedulerConfiguration.ROOT + "." + Q1;
   private final static String Q2_PATH =
       CapacitySchedulerConfiguration.ROOT + "." + Q2;
+  
+  private CapacityScheduler cs;
+  private YarnConfiguration conf;
+  
+  @Before
+  public void setup() {
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    conf = new YarnConfiguration(csConf);
+    cs = new CapacityScheduler();
 
-  private MockRM resourceManager;
-
-  @After
-  public void tearDown() throws Exception {
-    if (resourceManager != null) {
-      LOG.info("Stopping the resource manager");
-      resourceManager.stop();
-    }
+    RMContext rmContext = TestUtils.getMockRMContext();
+    cs.setConf(conf);
+    cs.setRMContext(rmContext);
+    cs.init(conf);
+    cs.start();
   }
 
   private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
@@ -67,26 +69,32 @@ public class TestQueueMappings {
 
     LOG.info("Setup top-level queues q1 and q2");
   }
+  
+  @Test
+  public void testQueueMappingSpecifyingNotExistedQueue() {
+    // if the mapping specifies a queue that does not exist, reinitialize will
+    // be failed
+    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
+        "u:user:non_existent_queue");
+    boolean fail = false;
+    try {
+      cs.reinitialize(conf, null);
+    } catch (IOException ioex) {
+      fail = true;
+    }
+    Assert.assertTrue("queue initialization failed for non-existent q", fail);
+  }
+  
+  @Test
+  public void testQueueMappingTrimSpaces() throws IOException {
+    // space trimming
+    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "    u : a : " + Q1);
+    cs.reinitialize(conf, null);
+    checkQMapping(new QueueMapping(MappingType.USER, "a", Q1));
+  }
 
   @Test (timeout = 60000)
-  public void testQueueMapping() throws Exception {
-    CapacitySchedulerConfiguration csConf =
-        new CapacitySchedulerConfiguration();
-    setupQueueConfiguration(csConf);
-    YarnConfiguration conf = new YarnConfiguration(csConf);
-    CapacityScheduler cs = new CapacityScheduler();
-
-    RMContext rmContext = TestUtils.getMockRMContext();
-    cs.setConf(conf);
-    cs.setRMContext(rmContext);
-    cs.init(conf);
-    cs.start();
-
-    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
-        SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
-    conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
-        "true");
-
+  public void testQueueMappingParsingInvalidCases() throws Exception {
     // configuration parsing tests - negative test cases
     checkInvalidQMapping(conf, cs, "x:a:b", "invalid specifier");
     checkInvalidQMapping(conf, cs, "u:a", "no queue specified");
@@ -97,119 +105,6 @@ public class TestQueueMappings {
     checkInvalidQMapping(conf, cs, "u::", "empty source and queue");
     checkInvalidQMapping(conf, cs, "u:", "missing source missing queue");
     checkInvalidQMapping(conf, cs, "u:a:", "empty source missing q");
-
-    // simple base case for mapping user to queue
-    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:a:" + Q1);
-    cs.reinitialize(conf, null);
-    checkQMapping("a", Q1, cs);
-
-    // group mapping test
-    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:agroup:" + Q1);
-    cs.reinitialize(conf, null);
-    checkQMapping("a", Q1, cs);
-
-    // %user tests
-    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:" + Q2);
-    cs.reinitialize(conf, null);
-    checkQMapping("a", Q2, cs);
-
-    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:%user");
-    cs.reinitialize(conf, null);
-    checkQMapping("a", "a", cs);
-
-    // %primary_group tests
-    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
-        "u:%user:%primary_group");
-    cs.reinitialize(conf, null);
-    checkQMapping("a", "agroup", cs);
-
-    // non-primary group mapping
-    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
-        "g:asubgroup1:" + Q1);
-    cs.reinitialize(conf, null);
-    checkQMapping("a", Q1, cs);
-
-    // space trimming
-    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "    u : a : " + Q1);
-    cs.reinitialize(conf, null);
-    checkQMapping("a", Q1, cs);
-
-    csConf = new CapacitySchedulerConfiguration();
-    csConf.set(YarnConfiguration.RM_SCHEDULER,
-        CapacityScheduler.class.getName());
-    setupQueueConfiguration(csConf);
-    conf = new YarnConfiguration(csConf);
-
-    resourceManager = new MockRM(csConf);
-    resourceManager.start();
-
-    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
-        SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
-    conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
-        "true");
-    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1);
-    resourceManager.getResourceScheduler().reinitialize(conf, null);
-
-    // ensure that if the user specifies a Q that is still overriden
-    checkAppQueue(resourceManager, "user", Q2, Q1);
-
-    // toggle admin override and retry
-    conf.setBoolean(
-        CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
-        false);
-    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1);
-    setupQueueConfiguration(csConf);
-    resourceManager.getResourceScheduler().reinitialize(conf, null);
-
-    checkAppQueue(resourceManager, "user", Q2, Q2);
-
-    // ensure that if a user does not specify a Q, the user mapping is used
-    checkAppQueue(resourceManager, "user", null, Q1);
-
-    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:usergroup:" + Q2);
-    setupQueueConfiguration(csConf);
-    resourceManager.getResourceScheduler().reinitialize(conf, null);
-
-    // ensure that if a user does not specify a Q, the group mapping is used
-    checkAppQueue(resourceManager, "user", null, Q2);
-
-    // if the mapping specifies a queue that does not exist, the job is rejected
-    conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
-        "u:user:non_existent_queue");
-    setupQueueConfiguration(csConf);
-
-    boolean fail = false;
-    try {
-      resourceManager.getResourceScheduler().reinitialize(conf, null);
-    }
-    catch (IOException ioex) {
-      fail = true;
-    }
-    Assert.assertTrue("queue initialization failed for non-existent q", fail);
-    resourceManager.stop();
-  }
-
-  private void checkAppQueue(MockRM resourceManager, String user,
-      String submissionQueue, String expected)
-      throws Exception {
-    RMApp app = resourceManager.submitApp(200, "name", user,
-        new HashMap<ApplicationAccessType, String>(), false, submissionQueue, -1,
-        null, "MAPREDUCE", false);
-    RMAppState expectedState = expected.isEmpty() ? RMAppState.FAILED
-        : RMAppState.ACCEPTED;
-    resourceManager.waitForState(app.getApplicationId(), expectedState);
-    // get scheduler app
-    CapacityScheduler cs = (CapacityScheduler)
-        resourceManager.getResourceScheduler();
-    SchedulerApplication schedulerApp =
-        cs.getSchedulerApplications().get(app.getApplicationId());
-    String queue = "";
-    if (schedulerApp != null) {
-      queue = schedulerApp.getQueue().getQueueName();
-    }
-    Assert.assertTrue("expected " + expected + " actual " + queue,
-        expected.equals(queue));
-    Assert.assertEquals(expected, app.getQueue());
   }
 
   private void checkInvalidQMapping(YarnConfiguration conf,
@@ -227,10 +122,12 @@ public class TestQueueMappings {
         fail);
   }
 
-  private void checkQMapping(String user, String expected, CapacityScheduler cs)
+  private void checkQMapping(QueueMapping expected)
           throws IOException {
-    String actual = cs.getMappedQueueForTest(user);
-    Assert.assertTrue("expected " + expected + " actual " + actual,
-        expected.equals(actual));
+    UserGroupMappingPlacementRule rule =
+        (UserGroupMappingPlacementRule) cs.getRMContext()
+            .getQueuePlacementManager().getPlacementRules().get(0);
+    QueueMapping queueMapping = rule.getQueueMappings().get(0);
+    Assert.assertEquals(queueMapping, expected);
   }
 }