You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/09/15 13:02:37 UTC
hadoop git commit: YARN-3635. Refactored current queue mapping
implementation in CapacityScheduler to use a generic PlacementManager
framework. Contributed by Wangda Tan (cherry picked from commit
5468baa80aa2a3e2a02e9a902deebafd734daf23)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 df9bb7449 -> eacc18677
YARN-3635. Refactored current queue mapping implementation in CapacityScheduler to use a generic PlacementManager framework. Contributed by Wangda Tan
(cherry picked from commit 5468baa80aa2a3e2a02e9a902deebafd734daf23)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/eacc1867
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/eacc1867
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/eacc1867
Branch: refs/heads/branch-2
Commit: eacc18677a09851583f3166a5b2917a68d46e938
Parents: df9bb74
Author: Jian He <ji...@apache.org>
Authored: Tue Sep 15 15:39:20 2015 +0800
Committer: Jian He <ji...@apache.org>
Committed: Tue Sep 15 19:02:17 2015 +0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../resourcemanager/RMActiveServiceContext.java | 16 +-
.../server/resourcemanager/RMAppManager.java | 9 +
.../yarn/server/resourcemanager/RMContext.java | 5 +
.../server/resourcemanager/RMContextImpl.java | 12 +-
.../placement/PlacementManager.java | 95 +++++++++
.../placement/PlacementRule.java | 55 +++++
.../UserGroupMappingPlacementRule.java | 164 +++++++++++++++
.../scheduler/capacity/CapacityScheduler.java | 126 ++++--------
.../CapacitySchedulerConfiguration.java | 32 +--
.../server/resourcemanager/TestAppManager.java | 54 ++++-
.../TestUserGroupMappingPlacementRule.java | 89 ++++++++
.../scheduler/capacity/TestQueueMappings.java | 203 +++++--------------
13 files changed, 584 insertions(+), 279 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eacc1867/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index d4fd1fb..5f3dcf6 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -403,6 +403,9 @@ Release 2.8.0 - UNRELEASED
YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend
container allocation logic. (Wangda Tan via jianhe)
+ YARN-3635. Refactored current queue mapping implementation in CapacityScheduler
+ to use a generic PlacementManager framework. (Wangda Tan via jianhe)
+
BUG FIXES
YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eacc1867/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 1abb14e..c71323f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@@ -99,9 +100,10 @@ public class RMActiveServiceContext {
private long schedulerRecoveryWaitTime = 0;
private boolean printLog = true;
private boolean isSchedulerReady = false;
+ private PlacementManager queuePlacementManager = null;
public RMActiveServiceContext() {
-
+ queuePlacementManager = new PlacementManager();
}
@Private
@@ -424,4 +426,16 @@ public class RMActiveServiceContext {
public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
return systemCredentials;
}
+
+ @Private
+ @Unstable
+ public PlacementManager getQueuePlacementManager() {
+ return queuePlacementManager;
+ }
+
+ @Private
+ @Unstable
+ public void setQueuePlacementManager(PlacementManager placementMgr) {
+ this.queuePlacementManager = placementMgr;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eacc1867/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 6fd1838..703ec1e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -326,6 +326,15 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
private RMAppImpl createAndPopulateNewRMApp(
ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovery) throws YarnException {
+ // Do queue mapping
+ if (!isRecovery) {
+ if (rmContext.getQueuePlacementManager() != null) {
+ // We only do queue mapping when it's a new application
+ rmContext.getQueuePlacementManager().placeApplication(
+ submissionContext, user);
+ }
+ }
+
ApplicationId applicationId = submissionContext.getApplicationId();
ResourceRequest amReq =
validateAndCreateResourceRequest(submissionContext, isRecovery);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eacc1867/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index bc50268..b64c834 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -124,4 +125,8 @@ public interface RMContext {
boolean isSchedulerReadyForAllocatingContainers();
Configuration getYarnConfiguration();
+
+ PlacementManager getQueuePlacementManager();
+
+ void setQueuePlacementManager(PlacementManager placementMgr);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eacc1867/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index d6d573d..840cea7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -76,7 +77,6 @@ public class RMContextImpl implements RMContext {
* individual fields.
*/
public RMContextImpl() {
-
}
@VisibleForTesting
@@ -438,4 +438,14 @@ public class RMContextImpl implements RMContext {
public void setYarnConfiguration(Configuration yarnConfiguration) {
this.yarnConfiguration=yarnConfiguration;
}
+
+ @Override
+ public PlacementManager getQueuePlacementManager() {
+ return this.activeServiceContext.getQueuePlacementManager();
+ }
+
+ @Override
+ public void setQueuePlacementManager(PlacementManager placementMgr) {
+ this.activeServiceContext.setQueuePlacementManager(placementMgr);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eacc1867/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
new file mode 100644
index 0000000..43a4deb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.placement;
+
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class PlacementManager {
+ private static final Log LOG = LogFactory.getLog(PlacementManager.class);
+
+ List<PlacementRule> rules;
+ ReadLock readLock;
+ WriteLock writeLock;
+
+ public PlacementManager() {
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ readLock = lock.readLock();
+ writeLock = lock.writeLock();
+ }
+
+ public void updateRules(List<PlacementRule> rules) {
+ try {
+ writeLock.lock();
+ this.rules = rules;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public void placeApplication(ApplicationSubmissionContext asc, String user)
+ throws YarnException {
+ try {
+ readLock.lock();
+ if (null == rules || rules.isEmpty()) {
+ return;
+ }
+
+ String newQueueName = null;
+ for (PlacementRule rule : rules) {
+ newQueueName = rule.getQueueForApp(asc, user);
+ if (newQueueName != null) {
+ break;
+ }
+ }
+
+ // Failed to get where to place application
+ if (null == newQueueName && null == asc.getQueue()) {
+ String msg = "Failed to get where to place application="
+ + asc.getApplicationId();
+ LOG.error(msg);
+ throw new YarnException(msg);
+ }
+
+ // Set it to ApplicationSubmissionContext
+ if (!StringUtils.equals(asc.getQueue(), newQueueName)) {
+ LOG.info("Placed application=" + asc.getApplicationId() + " to queue="
+ + newQueueName + ", original queue=" + asc.getQueue());
+ asc.setQueue(newQueueName);
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @VisibleForTesting
+ public List<PlacementRule> getPlacementRules() {
+ return rules;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eacc1867/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java
new file mode 100644
index 0000000..47dc48a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.placement;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+
+public abstract class PlacementRule {
+ public String getName() {
+ return this.getClass().getName();
+ }
+
+ public void initialize(Map<String, String> parameters, RMContext rmContext)
+ throws YarnException {
+ }
+
+ /**
+ * Get queue for a given application
+ *
+ * @param asc application submission context
+ * @param user userName
+ *
+ * @throws YarnException
+ * if any error happens
+ *
+ * @return <p>
+ * non-null value means it is determined
+ * </p>
+ * <p>
+ * null value means it is undetermined, so next {@link PlacementRule}
+ * in the {@link PlacementManager} will take care
+ * </p>
+ */
+ public abstract String getQueueForApp(ApplicationSubmissionContext asc,
+ String user) throws YarnException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eacc1867/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
new file mode 100644
index 0000000..d617d16
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.placement;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class UserGroupMappingPlacementRule extends PlacementRule {
+ private static final Log LOG = LogFactory
+ .getLog(UserGroupMappingPlacementRule.class);
+
+ public static final String CURRENT_USER_MAPPING = "%user";
+
+ public static final String PRIMARY_GROUP_MAPPING = "%primary_group";
+
+ private boolean overrideWithQueueMappings = false;
+ private List<QueueMapping> mappings = null;
+ private Groups groups;
+
+ @Private
+ public static class QueueMapping {
+
+ public enum MappingType {
+
+ USER("u"), GROUP("g");
+ private final String type;
+
+ private MappingType(String type) {
+ this.type = type;
+ }
+
+ public String toString() {
+ return type;
+ }
+
+ };
+
+ MappingType type;
+ String source;
+ String queue;
+
+ public QueueMapping(MappingType type, String source, String queue) {
+ this.type = type;
+ this.source = source;
+ this.queue = queue;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof QueueMapping) {
+ QueueMapping other = (QueueMapping) obj;
+ return (other.type.equals(type) &&
+ other.source.equals(source) &&
+ other.queue.equals(queue));
+ } else {
+ return false;
+ }
+ }
+ }
+
+ public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings,
+ List<QueueMapping> newMappings, Groups groups) {
+ this.mappings = newMappings;
+ this.overrideWithQueueMappings = overrideWithQueueMappings;
+ this.groups = groups;
+ }
+
+ private String getMappedQueue(String user) throws IOException {
+ for (QueueMapping mapping : mappings) {
+ if (mapping.type == MappingType.USER) {
+ if (mapping.source.equals(CURRENT_USER_MAPPING)) {
+ if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
+ return user;
+ } else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
+ return groups.getGroups(user).get(0);
+ } else {
+ return mapping.queue;
+ }
+ }
+ if (user.equals(mapping.source)) {
+ return mapping.queue;
+ }
+ }
+ if (mapping.type == MappingType.GROUP) {
+ for (String userGroups : groups.getGroups(user)) {
+ if (userGroups.equals(mapping.source)) {
+ return mapping.queue;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public String getQueueForApp(ApplicationSubmissionContext asc, String user)
+ throws YarnException {
+ String queueName = asc.getQueue();
+ ApplicationId applicationId = asc.getApplicationId();
+ if (mappings != null && mappings.size() > 0) {
+ try {
+ String mappedQueue = getMappedQueue(user);
+ if (mappedQueue != null) {
+ // We have a mapping, should we use it?
+ if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
+ || overrideWithQueueMappings) {
+ LOG.info("Application " + applicationId + " user " + user
+ + " mapping [" + queueName + "] to [" + mappedQueue
+ + "] override " + overrideWithQueueMappings);
+ return mappedQueue;
+ }
+ }
+ } catch (IOException ioex) {
+ String message = "Failed to submit application " + applicationId +
+ " submitted by user " + user + " reason: " + ioex.getMessage();
+ throw new YarnException(message);
+ }
+ }
+
+ return queueName;
+ }
+
+ @VisibleForTesting
+ public List<QueueMapping> getQueueMappings() {
+ return mappings;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eacc1867/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index dbaccaf..ad5c76c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -69,6 +69,9 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
@@ -98,8 +101,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -228,16 +229,6 @@ public class CapacityScheduler extends
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
-
- private boolean overrideWithQueueMappings = false;
- private List<QueueMapping> mappings = null;
- private Groups groups;
-
- @VisibleForTesting
- public synchronized String getMappedQueueForTest(String user)
- throws IOException {
- return getMappedQueue(user);
- }
public CapacityScheduler() {
super(CapacityScheduler.class.getName());
@@ -447,29 +438,52 @@ public class CapacityScheduler extends
}
private static final QueueHook noop = new QueueHook();
- private void initializeQueueMappings() throws IOException {
- overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
+ @VisibleForTesting
+ public synchronized UserGroupMappingPlacementRule
+ getUserGroupMappingPlacementRule() throws IOException {
+ boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
LOG.info("Initialized queue mappings, override: "
+ overrideWithQueueMappings);
+
// Get new user/group mappings
- List<QueueMapping> newMappings = conf.getQueueMappings();
- //check if mappings refer to valid queues
+ List<UserGroupMappingPlacementRule.QueueMapping> newMappings =
+ conf.getQueueMappings();
+ // check if mappings refer to valid queues
for (QueueMapping mapping : newMappings) {
- if (!mapping.queue.equals(CURRENT_USER_MAPPING) &&
- !mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
- CSQueue queue = queues.get(mapping.queue);
+ String mappingQueue = mapping.getQueue();
+ if (!mappingQueue
+ .equals(UserGroupMappingPlacementRule.CURRENT_USER_MAPPING)
+ && !mappingQueue
+ .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
+ CSQueue queue = queues.get(mappingQueue);
if (queue == null || !(queue instanceof LeafQueue)) {
- throw new IOException(
- "mapping contains invalid or non-leaf queue " + mapping.queue);
+ throw new IOException("mapping contains invalid or non-leaf queue "
+ + mappingQueue);
}
}
}
- //apply the new mappings since they are valid
- mappings = newMappings;
+
// initialize groups if mappings are present
- if (mappings.size() > 0) {
- groups = new Groups(conf);
+ if (newMappings.size() > 0) {
+ Groups groups = new Groups(conf);
+ return new UserGroupMappingPlacementRule(overrideWithQueueMappings,
+ newMappings, groups);
}
+
+ return null;
+ }
+
+ private void updatePlacementRules() throws IOException {
+ List<PlacementRule> placementRules = new ArrayList<>();
+
+ // Initialize UserGroupMappingPlacementRule
+ // TODO, need make this defineable by configuration.
+ UserGroupMappingPlacementRule ugRule = getUserGroupMappingPlacementRule();
+ if (null != ugRule) {
+ placementRules.add(ugRule);
+ }
+
+ rmContext.getQueuePlacementManager().updateRules(placementRules);
}
@Lock(CapacityScheduler.class)
@@ -481,7 +495,7 @@ public class CapacityScheduler extends
queues, queues, noop);
labelManager.reinitializeQueueLabels(getQueueToLabels());
LOG.info("Initialized root queue " + root);
- initializeQueueMappings();
+ updatePlacementRules();
setQueueAcls(authorizer, queues);
}
@@ -502,7 +516,7 @@ public class CapacityScheduler extends
// Re-configure queues
root.reinitialize(newRoot, clusterResource);
- initializeQueueMappings();
+ updatePlacementRules();
// Re-calculate headroom for active applications
root.updateClusterResource(clusterResource, new ResourceLimits(
@@ -647,66 +661,8 @@ public class CapacityScheduler extends
return queues.get(queueName);
}
- private static final String CURRENT_USER_MAPPING = "%user";
-
- private static final String PRIMARY_GROUP_MAPPING = "%primary_group";
-
- private String getMappedQueue(String user) throws IOException {
- for (QueueMapping mapping : mappings) {
- if (mapping.type == MappingType.USER) {
- if (mapping.source.equals(CURRENT_USER_MAPPING)) {
- if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
- return user;
- }
- else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
- return groups.getGroups(user).get(0);
- }
- else {
- return mapping.queue;
- }
- }
- if (user.equals(mapping.source)) {
- return mapping.queue;
- }
- }
- if (mapping.type == MappingType.GROUP) {
- for (String userGroups : groups.getGroups(user)) {
- if (userGroups.equals(mapping.source)) {
- return mapping.queue;
- }
- }
- }
- }
- return null;
- }
-
private synchronized void addApplication(ApplicationId applicationId,
String queueName, String user, boolean isAppRecovering, Priority priority) {
-
- if (mappings != null && mappings.size() > 0) {
- try {
- String mappedQueue = getMappedQueue(user);
- if (mappedQueue != null) {
- // We have a mapping, should we use it?
- if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
- || overrideWithQueueMappings) {
- LOG.info("Application " + applicationId + " user " + user
- + " mapping [" + queueName + "] to [" + mappedQueue
- + "] override " + overrideWithQueueMappings);
- queueName = mappedQueue;
- RMApp rmApp = rmContext.getRMApps().get(applicationId);
- rmApp.setQueue(queueName);
- }
- }
- } catch (IOException ioex) {
- String message = "Failed to submit application " + applicationId +
- " submitted by user " + user + " reason: " + ioex.getMessage();
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppRejectedEvent(applicationId, message));
- return;
- }
- }
-
// sanity checks.
CSQueue queue = getQueue(queueName);
if (queue == null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eacc1867/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index be5e6dd..b1461c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
@@ -211,35 +212,6 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final Integer DEFAULT_CONFIGURATION_APPLICATION_PRIORITY = 0;
-
- @Private
- public static class QueueMapping {
-
- public enum MappingType {
-
- USER("u"),
- GROUP("g");
- private final String type;
- private MappingType(String type) {
- this.type = type;
- }
-
- public String toString() {
- return type;
- }
-
- };
-
- MappingType type;
- String source;
- String queue;
-
- public QueueMapping(MappingType type, String source, String queue) {
- this.type = type;
- this.source = source;
- this.queue = queue;
- }
- }
@Private
public static final String AVERAGE_CAPACITY = "average-capacity";
@@ -747,7 +719,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
*/
public List<QueueMapping> getQueueMappings() {
List<QueueMapping> mappings =
- new ArrayList<CapacitySchedulerConfiguration.QueueMapping>();
+ new ArrayList<QueueMapping>();
Collection<String> mappingsString =
getTrimmedStringCollection(QUEUE_MAPPING);
for (String mappingValue : mappingsString) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eacc1867/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index cbeae5b..c435692 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -19,16 +19,10 @@
package org.apache.hadoop.yarn.server.resourcemanager;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
-
-import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -40,8 +34,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
-import org.junit.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -57,11 +54,14 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
@@ -73,8 +73,11 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -658,6 +661,39 @@ public class TestAppManager{
Assert.assertTrue(msg.contains("preemptedResources=<memory:1234\\, vCores:56>"));
Assert.assertTrue(msg.contains("applicationType=MAPREDUCE"));
}
+
+ @Test
+ public void testRMAppSubmitWithQueueChanged() throws Exception {
+ // Setup a PlacementManager returns a new queue
+ PlacementManager placementMgr = mock(PlacementManager.class);
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ ApplicationSubmissionContext ctx =
+ (ApplicationSubmissionContext) invocation.getArguments()[0];
+ ctx.setQueue("newQueue");
+ return null;
+ }
+
+ }).when(placementMgr).placeApplication(any(ApplicationSubmissionContext.class),
+ any(String.class));
+ rmContext.setQueuePlacementManager(placementMgr);
+
+ asContext.setQueue("oldQueue");
+ appMonitor.submitApplication(asContext, "test");
+ RMApp app = rmContext.getRMApps().get(appId);
+ Assert.assertNotNull("app is null", app);
+ Assert.assertEquals("newQueue", asContext.getQueue());
+
+ // wait for event to be processed
+ int timeoutSecs = 0;
+ while ((getAppEventType() == RMAppEventType.KILL) && timeoutSecs++ < 20) {
+ Thread.sleep(1000);
+ }
+ Assert.assertEquals("app event type sent is wrong", RMAppEventType.START,
+ getAppEventType());
+ }
private static ResourceScheduler mockResourceScheduler() {
ResourceScheduler scheduler = mock(ResourceScheduler.class);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eacc1867/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java
new file mode 100644
index 0000000..61bc8d9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.placement;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.security.GroupMappingServiceProvider;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestUserGroupMappingPlacementRule {
+ YarnConfiguration conf = new YarnConfiguration();
+
+ @Before
+ public void setup() {
+ conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+ SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+ }
+
+ private void verifyQueueMapping(QueueMapping queueMapping, String inputUser,
+ String expectedQueue) throws YarnException {
+ verifyQueueMapping(queueMapping, inputUser,
+ YarnConfiguration.DEFAULT_QUEUE_NAME, expectedQueue, false);
+ }
+
+ private void verifyQueueMapping(QueueMapping queueMapping, String inputUser,
+ String inputQueue, String expectedQueue, boolean overwrite) throws YarnException {
+ Groups groups = new Groups(conf);
+ UserGroupMappingPlacementRule rule =
+ new UserGroupMappingPlacementRule(overwrite, Arrays.asList(queueMapping),
+ groups);
+ ApplicationSubmissionContext asc =
+ Records.newRecord(ApplicationSubmissionContext.class);
+ asc.setQueue(inputQueue);
+ String queue = rule.getQueueForApp(asc, inputUser);
+ Assert.assertEquals(expectedQueue, queue);
+ }
+
+ @Test
+ public void testMapping() throws YarnException {
+ // simple base case for mapping user to queue
+ verifyQueueMapping(new QueueMapping(MappingType.USER, "a", "q1"), "a", "q1");
+ verifyQueueMapping(new QueueMapping(MappingType.GROUP, "agroup", "q1"),
+ "a", "q1");
+ verifyQueueMapping(new QueueMapping(MappingType.USER, "%user", "q2"), "a",
+ "q2");
+ verifyQueueMapping(new QueueMapping(MappingType.USER, "%user", "%user"),
+ "a", "a");
+ verifyQueueMapping(new QueueMapping(MappingType.USER, "%user",
+ "%primary_group"), "a", "agroup");
+ verifyQueueMapping(new QueueMapping(MappingType.GROUP, "asubgroup1", "q1"),
+ "a", "q1");
+
+ // specify overwritten, and see if user specified a queue, and it will be
+ // overridden
+ verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"),
+ "user", "q2", "q1", true);
+
+ // if overwritten not specified, it should be which user specified
+ verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"),
+ "user", "q2", "q2", false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eacc1867/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
index 005f40b..1df6b4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
@@ -18,22 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
-import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.security.GroupMappingServiceProvider;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
-import org.junit.After;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
public class TestQueueMappings {
@@ -47,15 +41,23 @@ public class TestQueueMappings {
CapacitySchedulerConfiguration.ROOT + "." + Q1;
private final static String Q2_PATH =
CapacitySchedulerConfiguration.ROOT + "." + Q2;
+
+ private CapacityScheduler cs;
+ private YarnConfiguration conf;
+
+ @Before
+ public void setup() {
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(csConf);
+ conf = new YarnConfiguration(csConf);
+ cs = new CapacityScheduler();
- private MockRM resourceManager;
-
- @After
- public void tearDown() throws Exception {
- if (resourceManager != null) {
- LOG.info("Stopping the resource manager");
- resourceManager.stop();
- }
+ RMContext rmContext = TestUtils.getMockRMContext();
+ cs.setConf(conf);
+ cs.setRMContext(rmContext);
+ cs.init(conf);
+ cs.start();
}
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
@@ -67,26 +69,32 @@ public class TestQueueMappings {
LOG.info("Setup top-level queues q1 and q2");
}
+
+ @Test
+ public void testQueueMappingSpecifyingNotExistedQueue() {
+ // if the mapping specifies a queue that does not exist, reinitialize will
+ // be failed
+ conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
+ "u:user:non_existent_queue");
+ boolean fail = false;
+ try {
+ cs.reinitialize(conf, null);
+ } catch (IOException ioex) {
+ fail = true;
+ }
+ Assert.assertTrue("queue initialization failed for non-existent q", fail);
+ }
+
+ @Test
+ public void testQueueMappingTrimSpaces() throws IOException {
+ // space trimming
+ conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, " u : a : " + Q1);
+ cs.reinitialize(conf, null);
+ checkQMapping(new QueueMapping(MappingType.USER, "a", Q1));
+ }
@Test (timeout = 60000)
- public void testQueueMapping() throws Exception {
- CapacitySchedulerConfiguration csConf =
- new CapacitySchedulerConfiguration();
- setupQueueConfiguration(csConf);
- YarnConfiguration conf = new YarnConfiguration(csConf);
- CapacityScheduler cs = new CapacityScheduler();
-
- RMContext rmContext = TestUtils.getMockRMContext();
- cs.setConf(conf);
- cs.setRMContext(rmContext);
- cs.init(conf);
- cs.start();
-
- conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
- SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
- conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
- "true");
-
+ public void testQueueMappingParsingInvalidCases() throws Exception {
// configuration parsing tests - negative test cases
checkInvalidQMapping(conf, cs, "x:a:b", "invalid specifier");
checkInvalidQMapping(conf, cs, "u:a", "no queue specified");
@@ -97,119 +105,6 @@ public class TestQueueMappings {
checkInvalidQMapping(conf, cs, "u::", "empty source and queue");
checkInvalidQMapping(conf, cs, "u:", "missing source missing queue");
checkInvalidQMapping(conf, cs, "u:a:", "empty source missing q");
-
- // simple base case for mapping user to queue
- conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:a:" + Q1);
- cs.reinitialize(conf, null);
- checkQMapping("a", Q1, cs);
-
- // group mapping test
- conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:agroup:" + Q1);
- cs.reinitialize(conf, null);
- checkQMapping("a", Q1, cs);
-
- // %user tests
- conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:" + Q2);
- cs.reinitialize(conf, null);
- checkQMapping("a", Q2, cs);
-
- conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:%user");
- cs.reinitialize(conf, null);
- checkQMapping("a", "a", cs);
-
- // %primary_group tests
- conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
- "u:%user:%primary_group");
- cs.reinitialize(conf, null);
- checkQMapping("a", "agroup", cs);
-
- // non-primary group mapping
- conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
- "g:asubgroup1:" + Q1);
- cs.reinitialize(conf, null);
- checkQMapping("a", Q1, cs);
-
- // space trimming
- conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, " u : a : " + Q1);
- cs.reinitialize(conf, null);
- checkQMapping("a", Q1, cs);
-
- csConf = new CapacitySchedulerConfiguration();
- csConf.set(YarnConfiguration.RM_SCHEDULER,
- CapacityScheduler.class.getName());
- setupQueueConfiguration(csConf);
- conf = new YarnConfiguration(csConf);
-
- resourceManager = new MockRM(csConf);
- resourceManager.start();
-
- conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
- SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
- conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
- "true");
- conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1);
- resourceManager.getResourceScheduler().reinitialize(conf, null);
-
- // ensure that if the user specifies a Q that is still overriden
- checkAppQueue(resourceManager, "user", Q2, Q1);
-
- // toggle admin override and retry
- conf.setBoolean(
- CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
- false);
- conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1);
- setupQueueConfiguration(csConf);
- resourceManager.getResourceScheduler().reinitialize(conf, null);
-
- checkAppQueue(resourceManager, "user", Q2, Q2);
-
- // ensure that if a user does not specify a Q, the user mapping is used
- checkAppQueue(resourceManager, "user", null, Q1);
-
- conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:usergroup:" + Q2);
- setupQueueConfiguration(csConf);
- resourceManager.getResourceScheduler().reinitialize(conf, null);
-
- // ensure that if a user does not specify a Q, the group mapping is used
- checkAppQueue(resourceManager, "user", null, Q2);
-
- // if the mapping specifies a queue that does not exist, the job is rejected
- conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
- "u:user:non_existent_queue");
- setupQueueConfiguration(csConf);
-
- boolean fail = false;
- try {
- resourceManager.getResourceScheduler().reinitialize(conf, null);
- }
- catch (IOException ioex) {
- fail = true;
- }
- Assert.assertTrue("queue initialization failed for non-existent q", fail);
- resourceManager.stop();
- }
-
- private void checkAppQueue(MockRM resourceManager, String user,
- String submissionQueue, String expected)
- throws Exception {
- RMApp app = resourceManager.submitApp(200, "name", user,
- new HashMap<ApplicationAccessType, String>(), false, submissionQueue, -1,
- null, "MAPREDUCE", false);
- RMAppState expectedState = expected.isEmpty() ? RMAppState.FAILED
- : RMAppState.ACCEPTED;
- resourceManager.waitForState(app.getApplicationId(), expectedState);
- // get scheduler app
- CapacityScheduler cs = (CapacityScheduler)
- resourceManager.getResourceScheduler();
- SchedulerApplication schedulerApp =
- cs.getSchedulerApplications().get(app.getApplicationId());
- String queue = "";
- if (schedulerApp != null) {
- queue = schedulerApp.getQueue().getQueueName();
- }
- Assert.assertTrue("expected " + expected + " actual " + queue,
- expected.equals(queue));
- Assert.assertEquals(expected, app.getQueue());
}
private void checkInvalidQMapping(YarnConfiguration conf,
@@ -227,10 +122,12 @@ public class TestQueueMappings {
fail);
}
- private void checkQMapping(String user, String expected, CapacityScheduler cs)
+ private void checkQMapping(QueueMapping expected)
throws IOException {
- String actual = cs.getMappedQueueForTest(user);
- Assert.assertTrue("expected " + expected + " actual " + actual,
- expected.equals(actual));
+ UserGroupMappingPlacementRule rule =
+ (UserGroupMappingPlacementRule) cs.getRMContext()
+ .getQueuePlacementManager().getPlacementRules().get(0);
+ QueueMapping queueMapping = rule.getQueueMappings().get(0);
+ Assert.assertEquals(queueMapping, expected);
}
}