You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sz...@apache.org on 2014/08/19 01:41:57 UTC
svn commit: r1618764 - in
/hadoop/common/branches/HDFS-6584/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoo...
Author: szetszwo
Date: Mon Aug 18 23:41:53 2014
New Revision: 1618764
URL: http://svn.apache.org/r1618764
Log:
Merge r1609845 through r1618763 from trunk.
Added:
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
- copied unchanged from r1618763, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
Modified:
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt?rev=1618764&r1=1618763&r2=1618764&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt Mon Aug 18 23:41:53 2014
@@ -47,6 +47,9 @@ Release 2.6.0 - UNRELEASED
YARN-2378. Added support for moving applications across queues in
CapacityScheduler. (Subramaniam Venkatraman Krishnan via jianhe)
+ YARN-2411. Support simple user and group mappings to queues. (Ram Venkatesh
+ via jianhe)
+
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml?rev=1618764&r1=1618763&r2=1618764&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml Mon Aug 18 23:41:53 2014
@@ -108,4 +108,27 @@
</description>
</property>
+ <property>
+ <name>yarn.scheduler.capacity.queue-mappings</name>
+ <value></value>
+ <description>
+ A list of mappings that will be used to assign jobs to queues
+ The syntax for this list is [u|g]:[name]:[queue_name][,next mapping]*
+ Typically this list will be used to map users to queues,
+ for example, u:%user:%user maps all users to queues with the same name
+ as the user.
+ </description>
+ </property>
+
+ <property>
+ <name>yarn.scheduler.capacity.queue-mappings-override.enable</name>
+ <value>false</value>
+ <description>
+ If a queue mapping is present, will it override the value specified
+ by the user? This can be used by administrators to place jobs in queues
+ that are different than the one specified by the user.
+ The default is false.
+ </description>
+ </property>
+
</configuration>
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1618764&r1=1618763&r2=1618764&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Mon Aug 18 23:41:53 2014
@@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-import com.google.common.base.Preconditions;
-
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -41,6 +39,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -59,10 +58,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -77,6 +73,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -94,6 +92,7 @@ import org.apache.hadoop.yarn.util.resou
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
@LimitedPrivate("yarn")
@Evolving
@@ -199,6 +198,16 @@ public class CapacityScheduler extends
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
+ private boolean overrideWithQueueMappings = false;
+ private List<QueueMapping> mappings = new ArrayList<QueueMapping>();
+ private Groups groups;
+
+ @VisibleForTesting
+ public synchronized String getMappedQueueForTest(String user)
+ throws IOException {
+ return getMappedQueue(user);
+ }
+
public CapacityScheduler() {
super(CapacityScheduler.class.getName());
}
@@ -263,7 +272,6 @@ public class CapacityScheduler extends
this.applications =
new ConcurrentHashMap<ApplicationId,
SchedulerApplication<FiCaSchedulerApp>>();
-
initializeQueues(this.conf);
scheduleAsynchronously = this.conf.getScheduleAynschronously();
@@ -402,7 +410,32 @@ public class CapacityScheduler extends
}
}
private static final QueueHook noop = new QueueHook();
-
+
+ private void initializeQueueMappings() throws IOException {
+ overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
+ LOG.info("Initialized queue mappings, override: "
+ + overrideWithQueueMappings);
+ // Get new user/group mappings
+ List<QueueMapping> newMappings = conf.getQueueMappings();
+ //check if mappings refer to valid queues
+ for (QueueMapping mapping : newMappings) {
+ if (!mapping.queue.equals(CURRENT_USER_MAPPING) &&
+ !mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
+ CSQueue queue = queues.get(mapping.queue);
+ if (queue == null || !(queue instanceof LeafQueue)) {
+ throw new IOException(
+ "mapping contains invalid or non-leaf queue " + mapping.queue);
+ }
+ }
+ }
+ //apply the new mappings since they are valid
+ mappings = newMappings;
+ // initialize groups if mappings are present
+ if (mappings.size() > 0) {
+ groups = new Groups(conf);
+ }
+ }
+
@Lock(CapacityScheduler.class)
private void initializeQueues(CapacitySchedulerConfiguration conf)
throws IOException {
@@ -410,7 +443,9 @@ public class CapacityScheduler extends
root =
parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
queues, queues, noop);
+
LOG.info("Initialized root queue " + root);
+ initializeQueueMappings();
}
@Lock(CapacityScheduler.class)
@@ -430,6 +465,7 @@ public class CapacityScheduler extends
// Re-configure queues
root.reinitialize(newRoot, clusterResource);
+ initializeQueueMappings();
}
/**
@@ -517,12 +553,73 @@ public class CapacityScheduler extends
}
synchronized CSQueue getQueue(String queueName) {
+ if (queueName == null) {
+ return null;
+ }
return queues.get(queueName);
}
+ private static final String CURRENT_USER_MAPPING = "%user";
+
+ private static final String PRIMARY_GROUP_MAPPING = "%primary_group";
+
+ private String getMappedQueue(String user) throws IOException {
+ for (QueueMapping mapping : mappings) {
+ if (mapping.type == MappingType.USER) {
+ if (mapping.source.equals(CURRENT_USER_MAPPING)) {
+ if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
+ return user;
+ }
+ else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
+ return groups.getGroups(user).get(0);
+ }
+ else {
+ return mapping.queue;
+ }
+ }
+ if (user.equals(mapping.source)) {
+ return mapping.queue;
+ }
+ }
+ if (mapping.type == MappingType.GROUP) {
+ for (String userGroups : groups.getGroups(user)) {
+ if (userGroups.equals(mapping.source)) {
+ return mapping.queue;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
private synchronized void addApplication(ApplicationId applicationId,
- String queueName, String user, boolean isAppRecovering) {
- // santiy checks.
+ String queueName, String user, boolean isAppRecovering) {
+
+ if (mappings != null && mappings.size() > 0) {
+ try {
+ String mappedQueue = getMappedQueue(user);
+ if (mappedQueue != null) {
+ // We have a mapping, should we use it?
+ if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
+ || overrideWithQueueMappings) {
+ LOG.info("Application " + applicationId + " user " + user
+ + " mapping [" + queueName + "] to [" + mappedQueue
+ + "] override " + overrideWithQueueMappings);
+ queueName = mappedQueue;
+ RMApp rmApp = rmContext.getRMApps().get(applicationId);
+ rmApp.setQueue(queueName);
+ }
+ }
+ } catch (IOException ioex) {
+ String message = "Failed to submit application " + applicationId +
+ " submitted by user " + user + " reason: " + ioex.getMessage();
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppRejectedEvent(applicationId, message));
+ return;
+ }
+ }
+
+ // sanity checks.
CSQueue queue = getQueue(queueName);
if (queue == null) {
String message = "Application " + applicationId +
@@ -902,8 +999,8 @@ public class CapacityScheduler extends
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
- appAddedEvent.getQueue(), appAddedEvent.getUser(),
- appAddedEvent.getIsAppRecovering());
+ appAddedEvent.getQueue(),
+ appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering());
}
break;
case APP_REMOVED:
Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1618764&r1=1618763&r2=1618764&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Mon Aug 18 23:41:53 2014
@@ -18,8 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -145,6 +144,44 @@ public class CapacitySchedulerConfigurat
@Private
public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false;
+
+ @Private
+ public static final String QUEUE_MAPPING = PREFIX + "queue-mappings";
+
+ @Private
+ public static final String ENABLE_QUEUE_MAPPING_OVERRIDE = QUEUE_MAPPING + "-override.enable";
+
+ @Private
+ public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false;
+
+ @Private
+ public static class QueueMapping {
+
+ public enum MappingType {
+
+ USER("u"),
+ GROUP("g");
+ private final String type;
+ private MappingType(String type) {
+ this.type = type;
+ }
+
+ public String toString() {
+ return type;
+ }
+
+ };
+
+ MappingType type;
+ String source;
+ String queue;
+
+ public QueueMapping(MappingType type, String source, String queue) {
+ this.type = type;
+ this.source = source;
+ this.queue = queue;
+ }
+ }
public CapacitySchedulerConfiguration() {
this(new Configuration());
@@ -378,4 +415,82 @@ public class CapacitySchedulerConfigurat
setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async);
}
+ public boolean getOverrideWithQueueMappings() {
+ return getBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE,
+ DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE);
+ }
+
+ /**
+ * Returns a collection of strings, trimming leading and trailing whitespeace
+ * on each value
+ *
+ * @param str
+ * String to parse
+ * @param delim
+ * delimiter to separate the values
+ * @return Collection of parsed elements.
+ */
+ private static Collection<String> getTrimmedStringCollection(String str,
+ String delim) {
+ List<String> values = new ArrayList<String>();
+ if (str == null)
+ return values;
+ StringTokenizer tokenizer = new StringTokenizer(str, delim);
+ while (tokenizer.hasMoreTokens()) {
+ String next = tokenizer.nextToken();
+ if (next == null || next.trim().isEmpty()) {
+ continue;
+ }
+ values.add(next.trim());
+ }
+ return values;
+ }
+
+ /**
+ * Get user/group mappings to queues.
+ *
+ * @return user/groups mappings or null on illegal configs
+ */
+ public List<QueueMapping> getQueueMappings() {
+ List<QueueMapping> mappings =
+ new ArrayList<CapacitySchedulerConfiguration.QueueMapping>();
+ Collection<String> mappingsString =
+ getTrimmedStringCollection(QUEUE_MAPPING);
+ for (String mappingValue : mappingsString) {
+ String[] mapping =
+ getTrimmedStringCollection(mappingValue, ":")
+ .toArray(new String[] {});
+ if (mapping.length != 3 || mapping[1].length() == 0
+ || mapping[2].length() == 0) {
+ throw new IllegalArgumentException(
+ "Illegal queue mapping " + mappingValue);
+ }
+
+ QueueMapping m;
+ try {
+ QueueMapping.MappingType mappingType;
+ if (mapping[0].equals("u")) {
+ mappingType = QueueMapping.MappingType.USER;
+ } else if (mapping[0].equals("g")) {
+ mappingType = QueueMapping.MappingType.GROUP;
+ } else {
+ throw new IllegalArgumentException(
+ "unknown mapping prefix " + mapping[0]);
+ }
+ m = new QueueMapping(
+ mappingType,
+ mapping[1],
+ mapping[2]);
+ } catch (Throwable t) {
+ throw new IllegalArgumentException(
+ "Illegal queue mapping " + mappingValue);
+ }
+
+ if (m != null) {
+ mappings.add(m);
+ }
+ }
+
+ return mappings;
+ }
}