You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC
svn commit: r885145 [20/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233:
./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/
src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueAclsInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueAclsInfo.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueAclsInfo.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueAclsInfo.java Sat Nov 28 20:26:01 2009
@@ -17,28 +17,20 @@
*/
package org.apache.hadoop.mapred;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
/**
* Class to encapsulate Queue ACLs for a particular
* user.
- *
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.QueueAclsInfo} instead
*/
-class QueueAclsInfo implements Writable {
+@Deprecated
+class QueueAclsInfo extends org.apache.hadoop.mapreduce.QueueAclsInfo {
- private String queueName;
- private String[] operations;
/**
* Default constructor for QueueAclsInfo.
*
*/
QueueAclsInfo() {
-
+ super();
}
/**
@@ -50,31 +42,11 @@
*
*/
QueueAclsInfo(String queueName, String[] operations) {
- this.queueName = queueName;
- this.operations = operations;
- }
-
- String getQueueName() {
- return queueName;
- }
-
- void setQueueName(String queueName) {
- this.queueName = queueName;
+ super(queueName, operations);
}
-
- String[] getOperations() {
- return operations;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- queueName = Text.readString(in);
- operations = WritableUtils.readStringArray(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, queueName);
- WritableUtils.writeStringArray(out, operations);
+
+ public static QueueAclsInfo downgrade(
+ org.apache.hadoop.mapreduce.QueueAclsInfo acl) {
+ return new QueueAclsInfo(acl.getQueueName(), acl.getOperations());
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java Sat Nov 28 20:26:01 2009
@@ -18,171 +18,260 @@
package org.apache.hadoop.mapred;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskScheduler.QueueRefresher;
+import org.apache.hadoop.mapreduce.QueueState;
+import org.apache.hadoop.security.SecurityUtil.AccessControlList;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonGenerator;
+
import java.io.IOException;
-import java.io.PrintWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.List;
+import java.net.URL;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.Queue.QueueState;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.SecurityUtil.AccessControlList;
-import org.apache.hadoop.util.StringUtils;
/**
* Class that exposes information about queues maintained by the Hadoop
* Map/Reduce framework.
- *
+ * <p/>
* The Map/Reduce framework can be configured with one or more queues,
- * depending on the scheduler it is configured with. While some
- * schedulers work only with one queue, some schedulers support multiple
- * queues.
- *
+ * depending on the scheduler it is configured with. While some
+ * schedulers work only with one queue, some schedulers support multiple
+ * queues. Some schedulers also support the notion of queues within
+ * queues - a feature called hierarchical queues.
+ * <p/>
+ * Queue names are unique, and used as a key to lookup queues. Hierarchical
+ * queues are named by a 'fully qualified name' such as q1:q2:q3, where
+ * q2 is a child queue of q1 and q3 is a child queue of q2.
+ * <p/>
+ * Leaf level queues are queues that contain no queues within them. Jobs
+ * can be submitted only to leaf level queues.
+ * <p/>
* Queues can be configured with various properties. Some of these
* properties are common to all schedulers, and those are handled by this
- * class. Schedulers might also associate several custom properties with
- * queues. Where such a case exists, the queue name must be used to link
- * the common properties with the scheduler specific ones.
+ * class. Schedulers might also associate several custom properties with
+ * queues. These properties are parsed and maintained per queue by the
+ * framework. If schedulers need more complicated structure to maintain
+ * configuration per queue, they are free to not use the facilities
+ * provided by the framework, but define their own mechanisms. In such cases,
+ * it is likely that the name of the queue will be used to relate the
+ * common properties of a queue with scheduler specific properties.
+ * <p/>
+ * Information related to a queue, such as its name, properties, scheduling
+ * information and children are exposed by this class via a serializable
+ * class called {@link JobQueueInfo}.
+ * <p/>
+ * Queues are configured in the configuration file mapred-queues.xml.
+ * To support backwards compatibility, queues can also be configured
+ * in mapred-site.xml. However, when configured in the latter, there is
+ * no support for hierarchical queues.
*/
+
class QueueManager {
-
- private static final Log LOG = LogFactory.getLog(QueueManager.class);
- // Configured queues this is backed by queues Map , mentioned below
- private Set<String> queueNames;
+ private static final Log LOG = LogFactory.getLog(QueueManager.class);
// Map of a queue name and Queue object
- private HashMap<String, Queue> queues;
-
- // Whether ACLs are enabled in the system or not.
- private boolean aclsEnabled;
-
- static final String QUEUE_STATE_SUFFIX = "state";
-
+ private Map<String, Queue> leafQueues = new HashMap<String,Queue>();
+ private Map<String, Queue> allQueues = new HashMap<String, Queue>();
static final String QUEUE_CONF_FILE_NAME = "mapred-queues.xml";
// Prefix in configuration for queue related keys
static final String QUEUE_CONF_PROPERTY_NAME_PREFIX
- = "mapred.queue.";//Resource in which queue acls are configured.
+ = "mapred.queue.";
+ //Resource in which queue acls are configured.
+
+ private Queue root = null;
+ private boolean isAclEnabled = false;
+
+ /**
+ * Factory method to create an appropriate instance of a queue
+ * configuration parser.
+ * <p/>
+ * Returns a parser that can parse either the deprecated property
+ * style queue configuration in mapred-site.xml, or one that can
+ * parse hierarchical queues in mapred-queues.xml. First preference
+ * is given to configuration in mapred-site.xml. If no queue
+ * configuration is found there, then a parser that can parse
+ * configuration in mapred-queues.xml is created.
+ *
+ * @param conf Configuration instance that determines which parser
+ * to use.
+ * @return Queue configuration parser
+ */
+ static QueueConfigurationParser getQueueConfigurationParser(
+ Configuration conf, boolean reloadConf) {
+ if (conf != null && conf.get(
+ DeprecatedQueueConfigurationParser.MAPRED_QUEUE_NAMES_KEY) != null) {
+ if (reloadConf) {
+ conf.reloadConfiguration();
+ }
+ return new DeprecatedQueueConfigurationParser(conf);
+ } else {
+ URL filePath =
+ Thread.currentThread().getContextClassLoader()
+ .getResource(QUEUE_CONF_FILE_NAME);
+ return new QueueConfigurationParser(filePath.getPath());
+ }
+ }
+
+ public QueueManager() {
+ initialize(getQueueConfigurationParser(null, false));
+ }
/**
* Construct a new QueueManager using configuration specified in the passed
* in {@link org.apache.hadoop.conf.Configuration} object.
- *
+ * <p/>
+ * This instance supports queue configuration specified in mapred-site.xml,
+ * but without support for hierarchical queues. If no queue configuration
+ * is found in mapred-site.xml, it will then look for site configuration
+ * in mapred-queues.xml supporting hierarchical queues.
+ *
* @param conf Configuration object where queue configuration is specified.
*/
public QueueManager(Configuration conf) {
- checkDeprecation(conf);
- conf.addResource(QUEUE_CONF_FILE_NAME);
-
- queues = new HashMap<String, Queue>();
+ initialize(getQueueConfigurationParser(conf, false));
+ }
- // First get the queue names
- String[] queueNameValues = conf.getStrings("mapred.queue.names",
- new String[]{JobConf.DEFAULT_QUEUE_NAME});
-
- // Get configured ACLs and state for each queue
- aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
- for (String name : queueNameValues) {
- try {
- Map<String, AccessControlList> acls = getQueueAcls(name, conf);
- QueueState state = getQueueState(name, conf);
- queues.put(name, new Queue(name, acls, state));
- } catch (Throwable t) {
- LOG.warn("Not able to initialize queue " + name);
- }
- }
- // Sync queue names with the configured queues.
- queueNames = queues.keySet();
+ /**
+ * Create an instance that supports hierarchical queues, defined in
+ * the passed in configuration file.
+ * <p/>
+ * This is mainly used for testing purposes and should not called from
+ * production code.
+ *
+ * @param confFile File where the queue configuration is found.
+ */
+ QueueManager(String confFile) {
+ QueueConfigurationParser cp = new QueueConfigurationParser(confFile);
+ initialize(cp);
}
-
+
/**
- * Return the set of queues configured in the system.
+ * Initialize the queue-manager with the queue hierarchy specified by the
+ * given {@link QueueConfigurationParser}.
*
- * The number of queues configured should be dependent on the Scheduler
+ * @param cp
+ */
+ private void initialize(QueueConfigurationParser cp) {
+ this.root = cp.getRoot();
+ leafQueues.clear();
+ allQueues.clear();
+ //At this point we have root populated
+ //update data structures leafNodes.
+ leafQueues = getRoot().getLeafQueues();
+ allQueues.putAll(getRoot().getInnerQueues());
+ allQueues.putAll(leafQueues);
+
+ LOG.info("AllQueues : " + allQueues + "; LeafQueues : " + leafQueues);
+ this.isAclEnabled = cp.isAclsEnabled();
+ }
+
+
+ /**
+ * Return the set of leaf level queues configured in the system to
+ * which jobs are submitted.
+ * <p/>
+ * The number of queues configured should be dependent on the Scheduler
* configured. Note that some schedulers work with only one queue, whereas
* others can support multiple queues.
- *
+ *
* @return Set of queue names.
*/
- public synchronized Set<String> getQueues() {
- return queueNames;
+ public synchronized Set<String> getLeafQueueNames() {
+ return leafQueues.keySet();
}
-
+
/**
- * Return true if the given {@link Queue.QueueOperation} can be
+ * Return true if the given {@link Queue.QueueOperation} can be
* performed by the specified user on the given queue.
- *
+ * <p/>
* An operation is allowed if all users are provided access for this
* operation, or if either the user or any of the groups specified is
* provided access.
- *
- * @param queueName Queue on which the operation needs to be performed.
- * @param oper The operation to perform
- * @param ugi The user and groups who wish to perform the operation.
- *
+ *
+ * @param queueName Queue on which the operation needs to be performed.
+ * @param oper The operation to perform
+ * @param ugi The user and groups who wish to perform the operation.
* @return true if the operation is allowed, false otherwise.
*/
- public synchronized boolean hasAccess(String queueName,
- Queue.QueueOperation oper,
- UserGroupInformation ugi) {
+ public synchronized boolean hasAccess(
+ String queueName,
+ Queue.QueueOperation oper,
+ UserGroupInformation ugi) {
return hasAccess(queueName, null, oper, ugi);
}
-
+
/**
- * Return true if the given {@link Queue.QueueOperation} can be
+ * Return true if the given {@link Queue.QueueOperation} can be
* performed by the specified user on the specified job in the given queue.
- *
- * An operation is allowed either if the owner of the job is the user
+ * <p/>
+ * An operation is allowed either if the owner of the job is the user
* performing the task, all users are provided access for this
* operation, or if either the user or any of the groups specified is
* provided access.
- *
- * If the {@link Queue.QueueOperation} is not job specific then the
+ * <p/>
+ * If the {@link Queue.QueueOperation} is not job specific then the
* job parameter is ignored.
- *
+ *
* @param queueName Queue on which the operation needs to be performed.
- * @param job The {@link JobInProgress} on which the operation is being
- * performed.
- * @param oper The operation to perform
- * @param ugi The user and groups who wish to perform the operation.
- *
+ * @param job The {@link JobInProgress} on which the operation is being
+ * performed.
+ * @param oper The operation to perform
+ * @param ugi The user and groups who wish to perform the operation.
* @return true if the operation is allowed, false otherwise.
*/
- public synchronized boolean hasAccess(String queueName, JobInProgress job,
- Queue.QueueOperation oper,
- UserGroupInformation ugi) {
- if (!aclsEnabled) {
- return true;
- }
+ public synchronized boolean hasAccess(
+ String queueName, JobInProgress job,
+ Queue.QueueOperation oper,
+ UserGroupInformation ugi) {
- Queue q = queues.get(queueName);
+ Queue q = leafQueues.get(queueName);
+
if (q == null) {
LOG.info("Queue " + queueName + " is not present");
return false;
}
-
+
+ if(q.getChildren() != null && !q.getChildren().isEmpty()) {
+ LOG.info("Cannot submit job to parent queue " + q.getName());
+ return false;
+ }
+
+ if (!isAclsEnabled()) {
+ return true;
+ }
+
if (LOG.isDebugEnabled()) {
- LOG.debug("checking access for : "
+ LOG.debug(
+ "checking access for : "
+ QueueManager.toFullPropertyName(queueName, oper.getAclName()));
}
if (oper.isJobOwnerAllowed()) {
- if (job != null
- && job.getJobConf().getUser().equals(ugi.getUserName())) {
+ if (job != null
+ && job.getJobConf().getUser().equals(ugi.getUserName())) {
return true;
}
}
-
+
AccessControlList acl = q.getAcls().get(
- toFullPropertyName(queueName,
- oper.getAclName()));
+ toFullPropertyName(
+ queueName,
+ oper.getAclName()));
if (acl == null) {
return false;
}
@@ -210,210 +299,200 @@
/**
* Checks whether the given queue is running or not.
+ *
* @param queueName name of the queue
* @return true, if the queue is running.
*/
synchronized boolean isRunning(String queueName) {
- Queue q = queues.get(queueName);
+ Queue q = leafQueues.get(queueName);
if (q != null) {
return q.getState().equals(QueueState.RUNNING);
}
return false;
}
-
+
/**
* Set a generic Object that represents scheduling information relevant
* to a queue.
- *
+ * <p/>
* A string representation of this Object will be used by the framework
* to display in user facing applications like the JobTracker web UI and
* the hadoop CLI.
- *
- * @param queueName queue for which the scheduling information is to be set.
+ *
+ * @param queueName queue for which the scheduling information is to be set.
* @param queueInfo scheduling information for this queue.
*/
- public synchronized void setSchedulerInfo(String queueName,
- Object queueInfo) {
- if (queues.get(queueName) != null)
- queues.get(queueName).setSchedulingInfo(queueInfo);
+ public synchronized void setSchedulerInfo(
+ String queueName,
+ Object queueInfo) {
+ if (allQueues.get(queueName) != null) {
+ allQueues.get(queueName).setSchedulingInfo(queueInfo);
+ }
}
-
+
/**
* Return the scheduler information configured for this queue.
- *
+ *
* @param queueName queue for which the scheduling information is required.
* @return The scheduling information for this queue.
- *
- * @see #setSchedulingInfo(String, Object)
*/
public synchronized Object getSchedulerInfo(String queueName) {
- if (queues.get(queueName) != null)
- return queues.get(queueName).getSchedulingInfo();
+ if (allQueues.get(queueName) != null) {
+ return allQueues.get(queueName).getSchedulingInfo();
+ }
return null;
}
-
+
+ static final String MSG_REFRESH_FAILURE_WITH_CHANGE_OF_HIERARCHY =
+ "Unable to refresh queues because queue-hierarchy changed. "
+ + "Retaining existing configuration. ";
+
+ static final String MSG_REFRESH_FAILURE_WITH_SCHEDULER_FAILURE =
+ "Scheduler couldn't refresh it's queues with the new"
+ + " configuration properties. "
+ + "Retaining existing configuration throughout the system.";
+
/**
- * Refresh the acls and state for the configured queues in the system
- * by reading it from mapred-queues.xml.
- *
- * Previously configured queues and if or not acls are disabled is retained.
+ * Refresh acls, state and scheduler properties for the configured queues.
+ * <p/>
+ * This method reloads configuration related to queues, but does not
+ * support changes to the list of queues or hierarchy. The expected usage
+ * is that an administrator can modify the queue configuration file and
+ * fire an admin command to reload queue configuration. If there is a
+ * problem in reloading configuration, then this method guarantees that
+ * existing queue configuration is untouched and in a consistent state.
*
+ * @param schedulerRefresher
* @throws IOException when queue configuration file is invalid.
*/
- synchronized void refreshQueues(Configuration conf) throws IOException {
-
- // First check if things are configured in mapred-site.xml,
- // so we can print out a deprecation warning.
- // This check is needed only until we support the configuration
- // in mapred-site.xml
- checkDeprecation(conf);
-
- // Add the queue configuration file. Values from mapred-site.xml
- // will be overridden.
- conf.addResource(QUEUE_CONF_FILE_NAME);
-
- // Now we refresh the properties of the queues. Note that we
- // do *not* refresh the queue names or the acls flag. Instead
- // we use the older values configured for them.
- LOG.info("Refreshing acls and state for configured queues.");
- try {
- Iterator<String> itr = queueNames.iterator();
- while(itr.hasNext()) {
- String name = itr.next();
- Queue q = queues.get(name);
- Map<String, AccessControlList> newAcls = getQueueAcls(name, conf);
- QueueState newState = getQueueState(name, conf);
- q.setAcls(newAcls);
- q.setState(newState);
- }
- } catch (Throwable t) {
- String exceptionString = StringUtils.stringifyException(t);
- LOG.warn("Queues could not be refreshed because there was an " +
- "exception in parsing the configuration: "+ exceptionString +
- ". Existing ACLs/state is retained.");
- throw new IOException(exceptionString);
- }
- }
-
- // Check if queue properties are configured in the passed in
- // configuration. If yes, print out deprecation warning messages.
- private void checkDeprecation(Configuration conf) {
-
- // check if queues are defined.
- String[] queues = null;
- String queueNameValues = conf.get("mapred.queue.names");
- if (queueNameValues != null) {
- LOG.warn("Configuring \"mapred.queue.names\" in mapred-site.xml or " +
- "hadoop-site.xml is deprecated. Configure " +
- "\"mapred.queue.names\" in " +
- QUEUE_CONF_FILE_NAME);
- // store queues so we can check if ACLs are also configured
- // in the deprecated files.
- queues = conf.getStrings("mapred.queue.names");
- }
-
- // check if the acls flag is defined
- String aclsEnable = conf.get("mapred.acls.enabled");
- if (aclsEnable != null) {
- LOG.warn("Configuring \"mapred.acls.enabled\" in mapred-site.xml or " +
- "hadoop-site.xml is deprecated. Configure " +
- "\"mapred.acls.enabled\" in " +
- QUEUE_CONF_FILE_NAME);
- }
-
- // check if acls are defined
- if (queues != null) {
- for (String queue : queues) {
- for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
- String key = toFullPropertyName(queue, oper.getAclName());
- String aclString = conf.get(key);
- if (aclString != null) {
- LOG.warn("Configuring queue ACLs in mapred-site.xml or " +
- "hadoop-site.xml is deprecated. Configure queue ACLs in " +
- QUEUE_CONF_FILE_NAME);
- // even if one string is configured, it is enough for printing
- // the warning. so we can return from here.
- return;
- }
- }
+ synchronized void refreshQueues(Configuration conf,
+ QueueRefresher schedulerRefresher)
+ throws IOException {
+
+ // Create a new configuration parser using the passed conf object.
+ QueueConfigurationParser cp =
+ QueueManager.getQueueConfigurationParser(conf, true);
+
+ /*
+ * (1) Validate the refresh of properties owned by QueueManager. As of now,
+ * while refreshing queue properties, we only check that the hierarchy is
+ * the same w.r.t queue names, ACLs and state for each queue and don't
+ * support adding new queues or removing old queues
+ */
+ if (!root.isHierarchySameAs(cp.getRoot())) {
+ LOG.warn(MSG_REFRESH_FAILURE_WITH_CHANGE_OF_HIERARCHY);
+ throw new IOException(MSG_REFRESH_FAILURE_WITH_CHANGE_OF_HIERARCHY);
+ }
+
+ /*
+ * (2) QueueManager owned properties are validated. Now validate and
+ * refresh the properties of scheduler in a single step.
+ */
+ if (schedulerRefresher != null) {
+ try {
+ schedulerRefresher.refreshQueues(cp.getRoot().getJobQueueInfo().getChildren());
+ } catch (Throwable e) {
+ StringBuilder msg =
+ new StringBuilder(
+ "Scheduler's refresh-queues failed with the exception : "
+ + StringUtils.stringifyException(e));
+ msg.append("\n");
+ msg.append(MSG_REFRESH_FAILURE_WITH_SCHEDULER_FAILURE);
+ LOG.error(msg.toString());
+ throw new IOException(msg.toString());
}
}
- }
-
- // Parse ACLs for the queue from the configuration.
- private Map<String, AccessControlList> getQueueAcls(String name,
- Configuration conf) {
- HashMap<String, AccessControlList> map =
- new HashMap<String, AccessControlList>();
- for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
- String aclKey = toFullPropertyName(name, oper.getAclName());
- map.put(aclKey, new AccessControlList(conf.get(aclKey, "*")));
- }
- return map;
- }
-
- // Parse ACLs for the queue from the configuration.
- private QueueState getQueueState(String name, Configuration conf) {
- QueueState retState = QueueState.RUNNING;
- String stateVal = conf.get(toFullPropertyName(name,
- QueueManager.QUEUE_STATE_SUFFIX),
- QueueState.RUNNING.getStateName());
- for (QueueState state : QueueState.values()) {
- if (state.getStateName().equalsIgnoreCase(stateVal)) {
- retState = state;
- break;
- }
- }
- return retState;
- }
-
- public static final String toFullPropertyName(String queue,
- String property) {
+
+ /*
+ * (3) Scheduler has validated and refreshed its queues successfully, now
+ * refresh the properties owned by QueueManager
+ */
+
+ // First copy the scheduling information recursively into the new
+ // queue-hierarchy. This is done to retain old scheduling information. This
+ // is done after scheduler refresh and not before it because during refresh,
+ // schedulers may wish to change their scheduling info objects too.
+ cp.getRoot().copySchedulingInfo(this.root);
+
+ // Now switch roots.
+ initialize(cp);
+
+ LOG.info("Queue configuration is refreshed successfully.");
+ }
+
+ static final String toFullPropertyName(
+ String queue,
+ String property) {
return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
}
+ /**
+ * Return an array of {@link JobQueueInfo} objects for all the
+ * queues configurated in the system.
+ *
+ * @return array of JobQueueInfo objects.
+ */
synchronized JobQueueInfo[] getJobQueueInfos() {
ArrayList<JobQueueInfo> queueInfoList = new ArrayList<JobQueueInfo>();
- for (String queue : queueNames) {
+ for (String queue : allQueues.keySet()) {
JobQueueInfo queueInfo = getJobQueueInfo(queue);
if (queueInfo != null) {
- queueInfoList.add(getJobQueueInfo(queue));
+ queueInfoList.add(queueInfo);
}
}
- return queueInfoList.toArray(
- new JobQueueInfo[queueInfoList.size()]);
+ return queueInfoList.toArray(
+ new JobQueueInfo[queueInfoList.size()]);
}
+
+ /**
+ * Return {@link JobQueueInfo} for a given queue.
+ *
+ * @param queue name of the queue
+ * @return JobQueueInfo for the queue, null if the queue is not found.
+ */
synchronized JobQueueInfo getJobQueueInfo(String queue) {
- if (queues.get(queue) != null) {
- Object schedulingInfo = queues.get(queue).getSchedulingInfo();
- JobQueueInfo qInfo;
- if (schedulingInfo != null) {
- qInfo = new JobQueueInfo(queue, schedulingInfo.toString());
- } else {
- qInfo = new JobQueueInfo(queue, null);
- }
- qInfo.setQueueState(queues.get(queue).getState().getStateName());
- return qInfo;
+ if (allQueues.containsKey(queue)) {
+ return allQueues.get(queue).getJobQueueInfo();
}
+
return null;
}
/**
- * Generates the array of QueueAclsInfo object.
- *
+ * JobQueueInfo for all the queues.
+ * <p/>
+ * Contribs can use this data structure to either create a hierarchy or for
+ * traversing.
+ * They can also use this to refresh properties in case of refreshQueues
+ *
+ * @return a map for easy navigation.
+ */
+ synchronized Map<String, JobQueueInfo> getJobQueueInfoMapping() {
+ Map<String, JobQueueInfo> m = new HashMap<String, JobQueueInfo>();
+
+ for (String key : allQueues.keySet()) {
+ m.put(key, allQueues.get(key).getJobQueueInfo());
+ }
+
+ return m;
+ }
+
+ /**
+ * Generates the array of QueueAclsInfo object.
+ * <p/>
* The array consists of only those queues for which user has acls.
*
* @return QueueAclsInfo[]
* @throws java.io.IOException
*/
synchronized QueueAclsInfo[] getQueueAcls(UserGroupInformation ugi)
- throws IOException {
+ throws IOException {
//List of all QueueAclsInfo objects , this list is returned
ArrayList<QueueAclsInfo> queueAclsInfolist =
- new ArrayList<QueueAclsInfo>();
+ new ArrayList<QueueAclsInfo>();
Queue.QueueOperation[] operations = Queue.QueueOperation.values();
- for (String queueName : queueNames) {
+ for (String queueName : leafQueues.keySet()) {
QueueAclsInfo queueAclsInfo = null;
ArrayList<String> operationsAllowed = null;
for (Queue.QueueOperation operation : operations) {
@@ -427,33 +506,229 @@
if (operationsAllowed != null) {
//There is atleast 1 operation supported for queue <queueName>
//, hence initialize queueAclsInfo
- queueAclsInfo = new QueueAclsInfo(queueName, operationsAllowed.toArray
- (new String[operationsAllowed.size()]));
+ queueAclsInfo = new QueueAclsInfo(
+ queueName, operationsAllowed.toArray
+ (new String[operationsAllowed.size()]));
queueAclsInfolist.add(queueAclsInfo);
}
}
- return queueAclsInfolist.toArray(new QueueAclsInfo[
- queueAclsInfolist.size()]);
+ return queueAclsInfolist.toArray(
+ new QueueAclsInfo[
+ queueAclsInfolist.size()]);
}
- // ONLY FOR TESTING - Do not use in production code.
+ /**
+ * ONLY FOR TESTING - Do not use in production code.
+ * This method is used for setting up of leafQueues only.
+ * We are not setting the hierarchy here.
+ *
+ * @param queues
+ */
synchronized void setQueues(Queue[] queues) {
+ root.getChildren().clear();
+ leafQueues.clear();
+ allQueues.clear();
+
for (Queue queue : queues) {
- this.queues.put(queue.getName(), queue);
+ root.addChild(queue);
+ }
+ //At this point we have root populated
+ //update data structures leafNodes.
+ leafQueues = getRoot().getLeafQueues();
+ allQueues.putAll(getRoot().getInnerQueues());
+ allQueues.putAll(leafQueues);
+ }
+
+ /**
+ * Return an array of {@link JobQueueInfo} objects for the root
+ * queues configured in the system.
+ * <p/>
+ * Root queues are queues that are at the top-most level in the
+ * hierarchy of queues in mapred-queues.xml, or they are the queues
+ * configured in the mapred.queue.names key in mapred-site.xml.
+ *
+ * @return array of JobQueueInfo objects for root level queues.
+ */
+
+ JobQueueInfo[] getRootQueues() {
+ List<JobQueueInfo> list = getRoot().getJobQueueInfo().getChildren();
+ return list.toArray(new JobQueueInfo[list.size()]);
+ }
+
+ /**
+ * Get the complete hierarchy of children for queue
+ * queueName
+ *
+ * @param queueName
+ * @return
+ */
+ JobQueueInfo[] getChildQueues(String queueName) {
+ List<JobQueueInfo> list =
+ allQueues.get(queueName).getJobQueueInfo().getChildren();
+ if (list != null) {
+ return list.toArray(new JobQueueInfo[list.size()]);
+ } else {
+ return new JobQueueInfo[0];
}
}
/**
- * prints the configuration of QueueManager in Json format.
- * The method should be modified accordingly whenever
- * QueueManager(Configuration) constructor is modified.
- * @param writer {@link}Writer object to which the configuration properties
- * are printed in json format
+ * Used only for testing purposes .
+ * This method is unstable as refreshQueues would leave this
+ * data structure in unstable state.
+ *
+ * @param queueName
+ * @return
+ */
+ Queue getQueue(String queueName) {
+ return this.allQueues.get(queueName);
+ }
+
+
+ /**
+ * Return if ACLs are enabled for the Map/Reduce system
+ *
+ * @return true if ACLs are enabled.
+ */
+ boolean isAclsEnabled() {
+ return isAclEnabled;
+ }
+
+ /**
+ * Used only for test.
+ *
+ * @return
+ */
+ Queue getRoot() {
+ return root;
+ }
+
+ /**
+ * Dumps the configuration of hierarchy of queues
+ * @param out the writer object to which dump is written
+ * @throws IOException
+ */
+ static void dumpConfiguration(Writer out,Configuration conf) throws IOException {
+ dumpConfiguration(out, null,conf);
+ }
+
+ /***
+ * Dumps the configuration of hierarchy of queues with
+ * the xml file path given. It is to be used directly ONLY FOR TESTING.
+ * @param out the writer object to which dump is written to.
+ * @param configFile the filename of xml file
+ * @throws IOException
+ */
+ static void dumpConfiguration(Writer out, String configFile,
+ Configuration conf) throws IOException {
+ if (conf != null && conf.get(DeprecatedQueueConfigurationParser.
+ MAPRED_QUEUE_NAMES_KEY) != null) {
+ return;
+ }
+ JsonFactory dumpFactory = new JsonFactory();
+ JsonGenerator dumpGenerator = dumpFactory.createJsonGenerator(out);
+ QueueConfigurationParser parser;
+ if (configFile != null && !"".equals(configFile)) {
+ parser = new QueueConfigurationParser(configFile);
+ }
+ else {
+ parser = QueueManager.getQueueConfigurationParser(null, false);
+ }
+ dumpGenerator.writeStartObject();
+ dumpGenerator.writeBooleanField("acls_enabled", parser.isAclsEnabled());
+ dumpGenerator.writeFieldName("queues");
+ dumpGenerator.writeStartArray();
+ dumpConfiguration(dumpGenerator,parser.getRoot().getChildren());
+ dumpGenerator.writeEndArray();
+ dumpGenerator.writeEndObject();
+ dumpGenerator.flush();
+ }
+
+ /**
+ * method to perform depth-first search and write the parameters of every
+ * queue in JSON format.
+ * @param dumpGenerator JsonGenerator object which takes the dump and flushes
+ * to a writer object
+ * @param rootQueues the top-level queues
+ * @throws JsonGenerationException
* @throws IOException
*/
- static void dumpConfiguration(Writer writer) throws IOException {
- Configuration conf = new Configuration(false);
- conf.addResource(QUEUE_CONF_FILE_NAME);
- Configuration.dumpConfiguration(conf, writer);
+ private static void dumpConfiguration(JsonGenerator dumpGenerator,
+ Set<Queue> rootQueues) throws JsonGenerationException, IOException {
+ for (Queue queue : rootQueues) {
+ dumpGenerator.writeStartObject();
+ dumpGenerator.writeStringField("name", queue.getName());
+ dumpGenerator.writeStringField("state", queue.getState().toString());
+ AccessControlList submitJobList = null;
+ AccessControlList administerJobsList = null;
+ if (queue.getAcls() != null) {
+ submitJobList =
+ queue.getAcls().get(QueueManager.toFullPropertyName(queue.getName(),
+ Queue.QueueOperation.SUBMIT_JOB.getAclName()));
+ administerJobsList =
+ queue.getAcls().get(QueueManager.toFullPropertyName(queue.getName(),
+ Queue.QueueOperation.ADMINISTER_JOBS.getAclName()));
+ }
+ StringBuilder aclsSubmitJobValue = new StringBuilder();
+ if (submitJobList != null ) {
+ aclsSubmitJobValue = getAclsInfo(submitJobList);
+ }
+ dumpGenerator.writeStringField("acl_submit_job",
+ aclsSubmitJobValue.toString());
+ StringBuilder aclsAdministerValue = new StringBuilder();
+ if (administerJobsList != null) {
+ aclsAdministerValue = getAclsInfo(administerJobsList);
+ }
+ dumpGenerator.writeStringField("acl_administer_jobs",
+ aclsAdministerValue.toString());
+ dumpGenerator.writeFieldName("properties");
+ dumpGenerator.writeStartArray();
+ if (queue.getProperties() != null) {
+ for (Map.Entry<Object, Object>property :
+ queue.getProperties().entrySet()) {
+ dumpGenerator.writeStartObject();
+ dumpGenerator.writeStringField("key", (String)property.getKey());
+ dumpGenerator.writeStringField("value", (String)property.getValue());
+ dumpGenerator.writeEndObject();
+ }
+ }
+ dumpGenerator.writeEndArray();
+ Set<Queue> childQueues = queue.getChildren();
+ dumpGenerator.writeFieldName("children");
+ dumpGenerator.writeStartArray();
+ if (childQueues != null && childQueues.size() > 0) {
+ dumpConfiguration(dumpGenerator, childQueues);
+ }
+ dumpGenerator.writeEndArray();
+ dumpGenerator.writeEndObject();
+ }
+ }
+
+ private static StringBuilder getAclsInfo(AccessControlList accessControlList) {
+ StringBuilder sb = new StringBuilder();
+ if (accessControlList.getUsers() != null &&
+ accessControlList.getUsers().size() > 0) {
+ Set<String> users = accessControlList.getUsers();
+ Iterator<String> iterator = users.iterator();
+ while (iterator.hasNext()) {
+ sb.append(iterator.next());
+ if (iterator.hasNext()) {
+ sb.append(",");
+ }
+ }
+ }
+ if (accessControlList.getGroups() != null &&
+ accessControlList.getGroups().size() > 0) {
+ sb.append(" ");
+ Set<String> groups = accessControlList.getGroups();
+ Iterator<String> iterator = groups.iterator();
+ while (iterator.hasNext()) {
+ sb.append(iterator.next());
+ if (iterator.hasNext()) {
+ sb.append(",");
+ }
+ }
+ }
+ return sb;
}
}