You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2010/09/17 09:34:40 UTC
svn commit: r998003 [2/3] - in /hadoop/mapreduce/trunk: ./ conf/
src/c++/task-controller/ src/c++/task-controller/tests/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
src/contrib/mumak/src/test/org/apache/hadoop/mapred/ src/docs/src...
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java Fri Sep 17 07:34:39 2010
@@ -20,7 +20,7 @@ package org.apache.hadoop.mapred;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.mapred.Queue.QueueOperation;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.QueueState;
import org.apache.hadoop.security.authorize.AccessControlList;
import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
@@ -72,11 +72,17 @@ class QueueConfigurationParser {
static final String QUEUE_TAG = "queue";
static final String ACL_SUBMIT_JOB_TAG = "acl-submit-job";
static final String ACL_ADMINISTER_JOB_TAG = "acl-administer-jobs";
+
+ // The value read from queues config file for this tag is not used at all.
+ // To enable queue acls and job acls, mapreduce.cluster.acls.enabled is
+ // to be set in mapred-site.xml
+ @Deprecated
+ static final String ACLS_ENABLED_TAG = "aclsEnabled";
+
static final String PROPERTIES_TAG = "properties";
static final String STATE_TAG = "state";
static final String QUEUE_NAME_TAG = "name";
static final String QUEUES_TAG = "queues";
- static final String ACLS_ENABLED_TAG = "aclsEnabled";
static final String PROPERTY_TAG = "property";
static final String KEY_TAG = "key";
static final String VALUE_TAG = "value";
@@ -88,7 +94,8 @@ class QueueConfigurationParser {
}
- QueueConfigurationParser(String confFile) {
+ QueueConfigurationParser(String confFile, boolean areAclsEnabled) {
+ aclsEnabled = areAclsEnabled;
File file = new File(confFile).getAbsoluteFile();
if (!file.exists()) {
throw new RuntimeException("Configuration file not found at " +
@@ -105,7 +112,8 @@ class QueueConfigurationParser {
}
}
- QueueConfigurationParser(InputStream xmlInput) {
+ QueueConfigurationParser(InputStream xmlInput, boolean areAclsEnabled) {
+ aclsEnabled = areAclsEnabled;
loadFrom(xmlInput);
}
@@ -184,8 +192,14 @@ class QueueConfigurationParser {
NamedNodeMap nmp = queuesNode.getAttributes();
Node acls = nmp.getNamedItem(ACLS_ENABLED_TAG);
- if (acls != null && acls.getTextContent().equalsIgnoreCase("true")) {
- setAclsEnabled(true);
+ if (acls != null) {
+ LOG.warn("Configuring " + ACLS_ENABLED_TAG + " flag in " +
+ QueueManager.QUEUE_CONF_FILE_NAME + " is not valid. " +
+ "This tag is ignored. Configure " +
+ MRConfig.MR_ACLS_ENABLED + " in mapred-site.xml. See the " +
+ " documentation of " + MRConfig.MR_ACLS_ENABLED +
+ ", which is used for enabling job level authorization and " +
+ " queue level authorization.");
}
NodeList props = queuesNode.getChildNodes();
@@ -269,9 +283,9 @@ class QueueConfigurationParser {
name += nameValue;
newQueue.setName(name);
submitKey = toFullPropertyName(name,
- Queue.QueueOperation.SUBMIT_JOB.getAclName());
+ QueueACL.SUBMIT_JOB.getAclName());
adminKey = toFullPropertyName(name,
- Queue.QueueOperation.ADMINISTER_JOBS.getAclName());
+ QueueACL.ADMINISTER_JOBS.getAclName());
}
if (QUEUE_TAG.equals(field.getTagName()) && field.hasChildNodes()) {
@@ -299,11 +313,11 @@ class QueueConfigurationParser {
}
if (!acls.containsKey(submitKey)) {
- acls.put(submitKey, new AccessControlList("*"));
+ acls.put(submitKey, new AccessControlList(" "));
}
if (!acls.containsKey(adminKey)) {
- acls.put(adminKey, new AccessControlList("*"));
+ acls.put(adminKey, new AccessControlList(" "));
}
//Set acls
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java Fri Sep 17 07:34:39 2010
@@ -20,11 +20,11 @@ package org.apache.hadoop.mapred;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.mapred.AuditLogger.Constants;
import org.apache.hadoop.mapred.TaskScheduler.QueueRefresher;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.QueueState;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
@@ -39,7 +39,6 @@ import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.List;
@@ -82,24 +81,25 @@ import java.net.URL;
* in mapred-site.xml. However, when configured in the latter, there is
* no support for hierarchical queues.
*/
-
-class QueueManager {
+@InterfaceAudience.Private
+public class QueueManager {
private static final Log LOG = LogFactory.getLog(QueueManager.class);
// Map of a queue name and Queue object
private Map<String, Queue> leafQueues = new HashMap<String,Queue>();
private Map<String, Queue> allQueues = new HashMap<String, Queue>();
- static final String QUEUE_CONF_FILE_NAME = "mapred-queues.xml";
+ public static final String QUEUE_CONF_FILE_NAME = "mapred-queues.xml";
static final String QUEUE_CONF_DEFAULT_FILE_NAME = "mapred-queues-default.xml";
- // Prefix in configuration for queue related keys
- static final String QUEUE_CONF_PROPERTY_NAME_PREFIX
- = "mapred.queue.";
- //Resource in which queue acls are configured.
+ //Prefix in configuration for queue related keys
+ static final String QUEUE_CONF_PROPERTY_NAME_PREFIX = "mapred.queue.";
+ //Resource in which queue acls are configured.
private Queue root = null;
- private boolean isAclEnabled = false;
+
+ // represents if job and queue acls are enabled on the mapreduce cluster
+ private boolean areAclsEnabled = false;
/**
* Factory method to create an appropriate instance of a queue
@@ -117,7 +117,7 @@ class QueueManager {
* @return Queue configuration parser
*/
static QueueConfigurationParser getQueueConfigurationParser(
- Configuration conf, boolean reloadConf) {
+ Configuration conf, boolean reloadConf, boolean areAclsEnabled) {
if (conf != null && conf.get(
DeprecatedQueueConfigurationParser.MAPRED_QUEUE_NAMES_KEY) != null) {
if (reloadConf) {
@@ -136,7 +136,8 @@ class QueueManager {
InputStream stream = null;
try {
stream = xmlInUrl.openStream();
- return new QueueConfigurationParser(new BufferedInputStream(stream));
+ return new QueueConfigurationParser(new BufferedInputStream(stream),
+ areAclsEnabled);
} catch (IOException ioe) {
throw new RuntimeException("Couldn't open queue configuration at " +
xmlInUrl, ioe);
@@ -146,8 +147,13 @@ class QueueManager {
}
}
- public QueueManager() {
- initialize(getQueueConfigurationParser(null, false));
+ QueueManager() {// acls are disabled
+ this(false);
+ }
+
+ QueueManager(boolean areAclsEnabled) {
+ this.areAclsEnabled = areAclsEnabled;
+ initialize(getQueueConfigurationParser(null, false, areAclsEnabled));
}
/**
@@ -159,10 +165,11 @@ class QueueManager {
* is found in mapred-site.xml, it will then look for site configuration
* in mapred-queues.xml supporting hierarchical queues.
*
- * @param conf Configuration object where queue configuration is specified.
+ * @param clusterConf mapreduce cluster configuration
*/
- public QueueManager(Configuration conf) {
- initialize(getQueueConfigurationParser(conf, false));
+ public QueueManager(Configuration clusterConf) {
+ areAclsEnabled = clusterConf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
+ initialize(getQueueConfigurationParser(clusterConf, false, areAclsEnabled));
}
/**
@@ -174,8 +181,10 @@ class QueueManager {
*
* @param confFile File where the queue configuration is found.
*/
- QueueManager(String confFile) {
- QueueConfigurationParser cp = new QueueConfigurationParser(confFile);
+ QueueManager(String confFile, boolean areAclsEnabled) {
+ this.areAclsEnabled = areAclsEnabled;
+ QueueConfigurationParser cp =
+ new QueueConfigurationParser(confFile, areAclsEnabled);
initialize(cp);
}
@@ -196,10 +205,8 @@ class QueueManager {
allQueues.putAll(leafQueues);
LOG.info("AllQueues : " + allQueues + "; LeafQueues : " + leafQueues);
- this.isAclEnabled = cp.isAclsEnabled();
}
-
/**
* Return the set of leaf level queues configured in the system to
* which jobs are submitted.
@@ -215,52 +222,22 @@ class QueueManager {
}
/**
- * Return true if the given {@link Queue.QueueOperation} can be
- * performed by the specified user on the given queue.
+ * Return true if the given user is part of the ACL for the given
+ * {@link QueueACL} name for the given queue.
* <p/>
* An operation is allowed if all users are provided access for this
* operation, or if either the user or any of the groups specified is
* provided access.
*
* @param queueName Queue on which the operation needs to be performed.
- * @param oper The operation to perform
+ * @param qACL The queue ACL name to be checked
* @param ugi The user and groups who wish to perform the operation.
- * @return true if the operation is allowed, false otherwise.
+ * @return true if the operation is allowed, false otherwise.
*/
public synchronized boolean hasAccess(
- String queueName,
- Queue.QueueOperation oper,
- UserGroupInformation ugi) {
- return hasAccess(queueName, null, oper, ugi);
- }
+ String queueName, QueueACL qACL, UserGroupInformation ugi) {
- /**
- * Return true if the given {@link Queue.QueueOperation} can be
- * performed by the specified user on the specified job in the given queue.
- * <p/>
- * An operation is allowed either if the owner of the job is the user
- * performing the task, all users are provided access for this
- * operation, or if either the user or any of the groups specified is
- * provided access.
- * <p/>
- * If the {@link Queue.QueueOperation} is not job specific then the
- * job parameter is ignored.
- *
- * @param queueName Queue on which the operation needs to be performed.
- * @param job The {@link JobInProgress} on which the operation is being
- * performed.
- * @param oper The operation to perform
- * @param ugi The user and groups who wish to perform the operation.
- * @return true if the operation is allowed, false otherwise.
- */
- public synchronized boolean hasAccess(
- String queueName, JobInProgress job,
- Queue.QueueOperation oper,
- UserGroupInformation ugi) {
-
Queue q = leafQueues.get(queueName);
- String user = ugi.getShortUserName();
- String jobId = job == null ? "-" : job.getJobID().toString();
if (q == null) {
LOG.info("Queue " + queueName + " is not present");
@@ -272,50 +249,23 @@ class QueueManager {
return false;
}
- if (!isAclsEnabled()) {
+ if (!areAclsEnabled()) {
return true;
}
if (LOG.isDebugEnabled()) {
- LOG.debug(
- "checking access for : "
- + QueueManager.toFullPropertyName(queueName, oper.getAclName()));
- }
-
- if (oper.isJobOwnerAllowed()) {
- if (job != null
- && job.getJobConf().getUser().equals(ugi.getShortUserName())) {
- AuditLogger.logSuccess(user, oper.name(), queueName);
- return true;
- }
+ LOG.debug("Checking access for the acl " + toFullPropertyName(queueName,
+ qACL.getAclName()) + " for user " + ugi.getShortUserName());
}
AccessControlList acl = q.getAcls().get(
- toFullPropertyName(
- queueName,
- oper.getAclName()));
+ toFullPropertyName(queueName, qACL.getAclName()));
if (acl == null) {
- AuditLogger.logFailure(user, oper.name(), null, queueName,
- "Disabled queue ACLs, job : " + jobId);
return false;
}
- // Check the ACL list
- boolean allowed = acl.isAllAllowed();
- if (!allowed) {
- // Check the allowed users list
- if (acl.isUserAllowed(ugi)) {
- allowed = true;
- }
- }
- if (allowed) {
- AuditLogger.logSuccess(user, oper.name(), queueName);
- } else {
- AuditLogger.logFailure(user, oper.name(), null, queueName,
- Constants.UNAUTHORIZED_USER + ", job : " + jobId);
- }
-
- return allowed;
+ // Check if user is part of the ACL
+ return acl.isUserAllowed(ugi);
}
/**
@@ -392,7 +342,7 @@ class QueueManager {
// Create a new configuration parser using the passed conf object.
QueueConfigurationParser cp =
- QueueManager.getQueueConfigurationParser(conf, true);
+ getQueueConfigurationParser(conf, true, areAclsEnabled);
/*
* (1) Validate the refresh of properties owned by QueueManager. As of now,
@@ -441,7 +391,8 @@ class QueueManager {
LOG.info("Queue configuration is refreshed successfully.");
}
- static final String toFullPropertyName(
+ // this method is for internal use only
+ public static final String toFullPropertyName(
String queue,
String property) {
return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
@@ -512,16 +463,16 @@ class QueueManager {
//List of all QueueAclsInfo objects , this list is returned
ArrayList<QueueAclsInfo> queueAclsInfolist =
new ArrayList<QueueAclsInfo>();
- Queue.QueueOperation[] operations = Queue.QueueOperation.values();
+ QueueACL[] qAcls = QueueACL.values();
for (String queueName : leafQueues.keySet()) {
QueueAclsInfo queueAclsInfo = null;
ArrayList<String> operationsAllowed = null;
- for (Queue.QueueOperation operation : operations) {
- if (hasAccess(queueName, operation, ugi)) {
+ for (QueueACL qAcl : qAcls) {
+ if (hasAccess(queueName, qAcl, ugi)) {
if (operationsAllowed == null) {
operationsAllowed = new ArrayList<String>();
}
- operationsAllowed.add(operation.getAclName());
+ operationsAllowed.add(qAcl.getAclName());
}
}
if (operationsAllowed != null) {
@@ -534,8 +485,7 @@ class QueueManager {
}
}
return queueAclsInfolist.toArray(
- new QueueAclsInfo[
- queueAclsInfolist.size()]);
+ new QueueAclsInfo[queueAclsInfolist.size()]);
}
/**
@@ -611,8 +561,8 @@ class QueueManager {
*
* @return true if ACLs are enabled.
*/
- boolean isAclsEnabled() {
- return isAclEnabled;
+ boolean areAclsEnabled() {
+ return areAclsEnabled;
}
/**
@@ -623,7 +573,30 @@ class QueueManager {
Queue getRoot() {
return root;
}
-
+
+ /**
+ * Returns the specific queue ACL for the given queue.
+ * Returns null if the given queue does not exist or the acl is not
+ * configured for that queue.
+ * If acls are disabled(mapreduce.cluster.acls.enabled set to false), returns
+ * ACL with all users.
+ */
+ synchronized AccessControlList getQueueACL(String queueName,
+ QueueACL qACL) {
+ if (areAclsEnabled) {
+ Queue q = leafQueues.get(queueName);
+ if (q != null) {
+ return q.getAcls().get(toFullPropertyName(
+ queueName, qACL.getAclName()));
+ }
+ else {
+ LOG.warn("Queue " + queueName + " is not present.");
+ return null;
+ }
+ }
+ return new AccessControlList("*");
+ }
+
/**
* Dumps the configuration of hierarchy of queues
* @param out the writer object to which dump is written
@@ -646,17 +619,21 @@ class QueueManager {
MAPRED_QUEUE_NAMES_KEY) != null) {
return;
}
+
JsonFactory dumpFactory = new JsonFactory();
JsonGenerator dumpGenerator = dumpFactory.createJsonGenerator(out);
QueueConfigurationParser parser;
+ boolean aclsEnabled = false;
+ if (conf != null) {
+ aclsEnabled = conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
+ }
if (configFile != null && !"".equals(configFile)) {
- parser = new QueueConfigurationParser(configFile);
+ parser = new QueueConfigurationParser(configFile, aclsEnabled);
}
else {
- parser = QueueManager.getQueueConfigurationParser(null, false);
+ parser = getQueueConfigurationParser(null, false, aclsEnabled);
}
dumpGenerator.writeStartObject();
- dumpGenerator.writeBooleanField("acls_enabled", parser.isAclsEnabled());
dumpGenerator.writeFieldName("queues");
dumpGenerator.writeStartArray();
dumpConfiguration(dumpGenerator,parser.getRoot().getChildren());
@@ -684,11 +661,11 @@ class QueueManager {
AccessControlList administerJobsList = null;
if (queue.getAcls() != null) {
submitJobList =
- queue.getAcls().get(QueueManager.toFullPropertyName(queue.getName(),
- Queue.QueueOperation.SUBMIT_JOB.getAclName()));
+ queue.getAcls().get(toFullPropertyName(queue.getName(),
+ QueueACL.SUBMIT_JOB.getAclName()));
administerJobsList =
- queue.getAcls().get(QueueManager.toFullPropertyName(queue.getName(),
- Queue.QueueOperation.ADMINISTER_JOBS.getAclName()));
+ queue.getAcls().get(toFullPropertyName(queue.getName(),
+ QueueACL.ADMINISTER_JOBS.getAclName()));
}
String aclsSubmitJobValue = " ";
if (submitJobList != null ) {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java Fri Sep 17 07:34:39 2010
@@ -30,9 +30,9 @@ import javax.servlet.http.HttpServletRes
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobACL;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
@@ -121,42 +121,54 @@ public class TaskLogServlet extends Http
* users and groups specified in configuration using
* mapreduce.job.acl-view-job to view job.
*/
- private void checkAccessForTaskLogs(JobConf conf, String user, JobID jobId,
+ private void checkAccessForTaskLogs(JobConf conf, String user, String jobId,
TaskTracker tracker) throws AccessControlException {
- if (!tracker.isJobLevelAuthorizationEnabled()) {
+ if (!tracker.areACLsEnabled()) {
return;
}
- // buiild job view acl by reading from conf
+ // buiild job view ACL by reading from conf
AccessControlList jobViewACL = tracker.getJobACLsManager().
constructJobACLs(conf).get(JobACL.VIEW_JOB);
+ // read job queue name from conf
+ String queue = conf.getQueueName();
+
+ // build queue admins ACL by reading from conf
+ AccessControlList queueAdminsACL = new AccessControlList(
+ conf.get(toFullPropertyName(queue,
+ QueueACL.ADMINISTER_JOBS.getAclName()), " "));
+
String jobOwner = conf.get(JobContext.USER_NAME);
UserGroupInformation callerUGI =
UserGroupInformation.createRemoteUser(user);
- tracker.getJobACLsManager().checkAccess(jobId, callerUGI, JobACL.VIEW_JOB,
- jobOwner, jobViewACL);
+ // check if user is queue admin or cluster admin or jobOwner or member of
+ // job-view-acl
+ if (!queueAdminsACL.isUserAllowed(callerUGI)) {
+ tracker.getACLsManager().checkAccess(jobId, callerUGI, queue,
+ Operation.VIEW_TASK_LOGS, jobOwner, jobViewACL);
+ }
}
/**
- * Builds a Configuration object by reading the xml file.
+ * Builds a JobConf object by reading the job-acls.xml file.
* This doesn't load the default resources.
*
- * Returns null if job-acls.xml is not there in userlogs/$jobid/attempt-dir on
+ * Returns null if job-acls.xml is not there in userlogs/$jobid on
* local file system. This can happen when we restart the cluster with job
* level authorization enabled(but was disabled on earlier cluster) and
* viewing task logs of old jobs(i.e. jobs finished on earlier unsecure
* cluster).
*/
- static Configuration getConfFromJobACLsFile(TaskAttemptID attemptId,
- boolean isCleanup) {
+ static JobConf getConfFromJobACLsFile(JobID jobId) {
Path jobAclsFilePath = new Path(
- TaskLog.getAttemptDir(attemptId, isCleanup).toString(), TaskRunner.jobACLsFile);
- Configuration conf = null;
+ TaskLog.getJobDir(jobId).toString(),
+ TaskTracker.jobACLsFile);
+ JobConf conf = null;
if (new File(jobAclsFilePath.toUri().getPath()).exists()) {
- conf = new Configuration(false);
+ conf = new JobConf(false);
conf.addResource(jobAclsFilePath);
}
return conf;
@@ -228,15 +240,15 @@ public class TaskLogServlet extends Http
ServletContext context = getServletContext();
TaskTracker taskTracker = (TaskTracker) context.getAttribute(
"task.tracker");
+ JobID jobId = attemptId.getJobID();
+
// get jobACLConf from ACLs file
- Configuration jobACLConf = getConfFromJobACLsFile(attemptId, isCleanup);
+ JobConf jobACLConf = getConfFromJobACLsFile(jobId);
// Ignore authorization if job-acls.xml is not found
if (jobACLConf != null) {
- JobID jobId = attemptId.getJobID();
-
try {
- checkAccessForTaskLogs(new JobConf(jobACLConf), user, jobId,
- taskTracker);
+ checkAccessForTaskLogs(jobACLConf, user,
+ jobId.toString(), taskTracker);
} catch (AccessControlException e) {
String errMsg = "User " + user + " failed to view tasklogs of job " +
jobId + "!\n\n" + e.getMessage();
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Fri Sep 17 07:34:39 2010
@@ -19,7 +19,6 @@ package org.apache.hadoop.mapred;
import java.io.ByteArrayOutputStream;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
@@ -34,7 +33,6 @@ import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@@ -77,8 +75,6 @@ abstract class TaskRunner extends Thread
protected JobConf conf;
JvmManager jvmManager;
- static String jobACLsFile = "job-acl.xml";
-
public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker,
JobConf conf) {
this.tip = tip;
@@ -286,32 +282,8 @@ abstract class TaskRunner extends Thread
Localizer.PermissionsHandler.setPermissions(logDir,
Localizer.PermissionsHandler.sevenZeroZero);
}
- // write job acls into a file to know the access for task logs
- writeJobACLs(logDir);
- return logFiles;
- }
- // Writes job-view-acls and user name into an xml file
- private void writeJobACLs(File logDir) throws IOException {
- File aclFile = new File(logDir, TaskRunner.jobACLsFile);
- Configuration aclConf = new Configuration(false);
-
- // set the job view acls in aclConf
- String jobViewACLs = conf.get(MRJobConfig.JOB_ACL_VIEW_JOB);
- if (jobViewACLs != null) {
- aclConf.set(MRJobConfig.JOB_ACL_VIEW_JOB, jobViewACLs);
- }
- // set jobOwner as mapreduce.job.user.name in aclConf
- String jobOwner = conf.getUser();
- aclConf.set(MRJobConfig.USER_NAME, jobOwner);
- FileOutputStream out = new FileOutputStream(aclFile);
- try {
- aclConf.writeXml(out);
- } finally {
- out.close();
- }
- Localizer.PermissionsHandler.setPermissions(aclFile,
- Localizer.PermissionsHandler.sevenZeroZero);
+ return logFiles;
}
/**
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Sep 17 07:34:39 2010
@@ -20,6 +20,7 @@
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
@@ -78,6 +79,7 @@ import org.apache.hadoop.mapred.TaskTrac
import org.apache.hadoop.mapred.pipes.Submitter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
@@ -160,6 +162,11 @@ public class TaskTracker
public static final Log ClientTraceLog =
LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
+ // Job ACLs file is created by TaskTracker under userlogs/$jobid directory for
+ // each job at job localization time. This will be used by TaskLogServlet for
+ // authorizing viewing of task logs of that job
+ static String jobACLsFile = "job-acls.xml";
+
volatile boolean running = true;
private LocalDirAllocator localDirAllocator;
@@ -249,9 +256,7 @@ public class TaskTracker
private int maxReduceSlots;
private int failures;
- // MROwner's ugi
- private UserGroupInformation mrOwner;
- private String supergroup;
+ private ACLsManager aclsManager;
// Performance-related config knob to send an out-of-band heartbeat
// on task completion
@@ -275,9 +280,6 @@ public class TaskTracker
private long reservedPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
private ResourceCalculatorPlugin resourceCalculatorPlugin = null;
- // Manages job acls of jobs in TaskTracker
- private TaskTrackerJobACLsManager jobACLsManager;
-
/**
* the minimum interval between jobtracker polls
*/
@@ -590,13 +592,11 @@ public class TaskTracker
* close().
*/
synchronized void initialize() throws IOException, InterruptedException {
- UserGroupInformation.setConfiguration(fConf);
- SecurityUtil.login(fConf, TTConfig.TT_KEYTAB_FILE, TTConfig.TT_USER_NAME);
- mrOwner = UserGroupInformation.getCurrentUser();
- supergroup = fConf.get(MRConfig.MR_SUPERGROUP, "supergroup");
- LOG.info("Starting tasktracker with owner as " + mrOwner.getShortUserName()
- + " and supergroup as " + supergroup);
+ aclsManager = new ACLsManager(fConf, new JobACLsManager(fConf), null);
+ LOG.info("Starting tasktracker with owner as " +
+ getMROwner().getShortUserName() + " and supergroup as " +
+ getSuperGroup());
localFs = FileSystem.getLocal(fConf);
// use configured nameserver & interface to get local hostname
@@ -687,7 +687,8 @@ public class TaskTracker
this.distributedCacheManager.startCleanupThread();
this.jobClient = (InterTrackerProtocol)
- mrOwner.doAs(new PrivilegedExceptionAction<Object>() {
+ UserGroupInformation.getLoginUser().doAs(
+ new PrivilegedExceptionAction<Object>() {
public Object run() throws IOException {
return RPC.waitForProxy(InterTrackerProtocol.class,
InterTrackerProtocol.versionID,
@@ -735,19 +736,22 @@ public class TaskTracker
}
UserGroupInformation getMROwner() {
- return mrOwner;
+ return aclsManager.getMROwner();
}
String getSuperGroup() {
- return supergroup;
+ return aclsManager.getSuperGroup();
}
-
+
+ boolean isMRAdmin(UserGroupInformation ugi) {
+ return aclsManager.isMRAdmin(ugi);
+ }
+
/**
- * Is job level authorization enabled on the TT ?
+ * Are ACLs for authorization checks enabled on the MR cluster ?
*/
- boolean isJobLevelAuthorizationEnabled() {
- return fConf.getBoolean(
- MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, false);
+ boolean areACLsEnabled() {
+ return fConf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
}
public static Class<?>[] getInstrumentationClasses(Configuration conf) {
@@ -998,7 +1002,7 @@ public class TaskTracker
JobConf localJobConf = localizeJobFiles(t, rjob);
// initialize job log directory
- initializeJobLogDir(jobId);
+ initializeJobLogDir(jobId, localJobConf);
// Now initialize the job via task-controller so as to set
// ownership/permissions of jars, job-work-dir. Note that initializeJob
@@ -1098,12 +1102,64 @@ public class TaskTracker
return localJobConf;
}
- // create job userlog dir
- void initializeJobLogDir(JobID jobId) {
+ // Create job userlog dir.
+ // Create job acls file in job log dir, if needed.
+ void initializeJobLogDir(JobID jobId, JobConf localJobConf)
+ throws IOException {
// remove it from tasklog cleanup thread first,
// it might be added there because of tasktracker reinit or restart
taskLogCleanupThread.unmarkJobFromLogDeletion(jobId);
localizer.initializeJobLogDir(jobId);
+
+ if (areACLsEnabled()) {
+ // Create job-acls.xml file in job userlog dir and write the needed
+ // info for authorization of users for viewing task logs of this job.
+ writeJobACLs(localJobConf, TaskLog.getJobDir(jobId));
+ }
+ }
+
+ /**
+ * Creates job-acls.xml under the given directory logDir and writes
+ * job-view-acl, queue-admins-acl, jobOwner name and queue name into this
+ * file.
+ * queue name is the queue to which the job was submitted to.
+ * queue-admins-acl is the queue admins ACL of the queue to which this
+ * job was submitted to.
+ * @param conf job configuration
+ * @param logDir job userlog dir
+ * @throws IOException
+ */
+ private static void writeJobACLs(JobConf conf, File logDir)
+ throws IOException {
+ File aclFile = new File(logDir, jobACLsFile);
+ JobConf aclConf = new JobConf(false);
+
+ // set the job view acl in aclConf
+ String jobViewACL = conf.get(MRJobConfig.JOB_ACL_VIEW_JOB, " ");
+ aclConf.set(MRJobConfig.JOB_ACL_VIEW_JOB, jobViewACL);
+
+ // set the job queue name in aclConf
+ String queue = conf.getQueueName();
+ aclConf.setQueueName(queue);
+
+ // set the queue admins acl in aclConf
+ String qACLName = toFullPropertyName(queue,
+ QueueACL.ADMINISTER_JOBS.getAclName());
+ String queueAdminsACL = conf.get(qACLName, " ");
+ aclConf.set(qACLName, queueAdminsACL);
+
+ // set jobOwner as user.name in aclConf
+ String jobOwner = conf.getUser();
+ aclConf.set("user.name", jobOwner);
+
+ FileOutputStream out = new FileOutputStream(aclFile);
+ try {
+ aclConf.writeXml(out);
+ } finally {
+ out.close();
+ }
+ Localizer.PermissionsHandler.setPermissions(aclFile,
+ Localizer.PermissionsHandler.sevenZeroZero);
}
/**
@@ -1318,8 +1374,10 @@ public class TaskTracker
checkJettyPort(httpPort);
// create task log cleanup thread
setTaskLogCleanupThread(new UserLogCleaner(fConf));
- // Initialize the jobACLSManager
- jobACLsManager = new TaskTrackerJobACLsManager(this);
+
+ UserGroupInformation.setConfiguration(fConf);
+ SecurityUtil.login(fConf, TTConfig.TT_KEYTAB_FILE, TTConfig.TT_USER_NAME);
+
initialize();
}
@@ -4043,7 +4101,11 @@ public class TaskTracker
return localJobTokenFileStr;
}
- TaskTrackerJobACLsManager getJobACLsManager() {
- return jobACLsManager;
+ JobACLsManager getJobACLsManager() {
+ return aclsManager.getJobACLsManager();
+ }
+
+ ACLsManager getACLsManager() {
+ return aclsManager;
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java Fri Sep 17 07:34:39 2010
@@ -41,12 +41,15 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.QueueACL;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.split.JobSplitWriter;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.codehaus.jackson.JsonParseException;
@@ -355,6 +358,14 @@ class JobSubmitter {
conf.setInt("mapred.map.tasks", maps);
LOG.info("number of splits:" + maps);
+ // write "queue admins of the queue to which job is being submitted"
+ // to job file.
+ String queue = conf.get(MRJobConfig.QUEUE_NAME,
+ JobConf.DEFAULT_QUEUE_NAME);
+ AccessControlList acl = submitClient.getQueueAdmins(queue);
+ conf.set(toFullPropertyName(queue,
+ QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
+
// Write job file to submit dir
writeConf(conf, submitJobFile);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java Fri Sep 17 07:34:39 2010
@@ -37,8 +37,7 @@ public interface MRConfig {
public static final String MAPMEMORY_MB = "mapreduce.cluster.mapmemory.mb";
public static final String REDUCEMEMORY_MB =
"mapreduce.cluster.reducememory.mb";
- public static final String JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG =
- "mapreduce.cluster.job-authorization-enabled";
+ public static final String MR_ACLS_ENABLED = "mapreduce.cluster.acls.enabled";
public static final String MR_SUPERGROUP =
"mapreduce.cluster.permissions.supergroup";
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueInfo.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueInfo.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueInfo.java Fri Sep 17 07:34:39 2010
@@ -60,7 +60,7 @@ public class QueueInfo implements Writab
*/
public QueueInfo() {
// make it running by default.
- this.queueState = queueState.RUNNING;
+ this.queueState = QueueState.RUNNING;
children = new ArrayList<QueueInfo>();
props = new Properties();
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr Fri Sep 17 07:34:39 2010
@@ -77,8 +77,9 @@
{"name": "jobConfPath", "type": "string"},
{"name": "acls", "type": {"type": "map",
"values": "string"
- }
- }
+ }
+ },
+ {"name": "jobQueueName", "type": "string"}
]
},
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Fri Sep 17 07:34:39 2010
@@ -314,6 +314,7 @@ public class JobHistoryParser {
info.submitTime = event.getSubmitTime();
info.jobConfPath = event.getJobConfPath();
info.jobACLs = event.getJobAcls();
+ info.jobQueueName = event.getJobQueueName();
}
/**
@@ -325,6 +326,7 @@ public class JobHistoryParser {
JobID jobid;
String username;
String jobname;
+ String jobQueueName;
String jobConfPath;
long launchTime;
int totalMaps;
@@ -349,7 +351,7 @@ public class JobHistoryParser {
submitTime = launchTime = finishTime = -1;
totalMaps = totalReduces = failedMaps = failedReduces = 0;
finishedMaps = finishedReduces = 0;
- username = jobname = jobConfPath = "";
+ username = jobname = jobConfPath = jobQueueName = "";
tasksMap = new HashMap<TaskID, TaskInfo>();
jobACLs = new HashMap<JobACL, AccessControlList>();
}
@@ -358,6 +360,7 @@ public class JobHistoryParser {
public void printAll() {
System.out.println("JOBNAME: " + jobname);
System.out.println("USERNAME: " + username);
+ System.out.println("JOB_QUEUE_NAME: " + jobQueueName);
System.out.println("SUBMIT_TIME" + submitTime);
System.out.println("LAUNCH_TIME: " + launchTime);
System.out.println("JOB_STATUS: " + jobStatus);
@@ -383,6 +386,8 @@ public class JobHistoryParser {
public String getUsername() { return username; }
/** Get the job name */
public String getJobname() { return jobname; }
+ /** Get the job queue name */
+ public String getJobQueueName() { return jobQueueName; }
/** Get the path for the job configuration file */
public String getJobConfPath() { return jobConfPath; }
/** Get the job launch time */
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java Fri Sep 17 07:34:39 2010
@@ -40,18 +40,6 @@ public class JobSubmittedEvent implement
private JobSubmitted datum = new JobSubmitted();
/**
- * @deprecated Use
- * {@link #JobSubmittedEvent(JobID, String, String, long, String, Map)}
- * instead.
- */
- @Deprecated
- public JobSubmittedEvent(JobID id, String jobName, String userName,
- long submitTime, String jobConfPath) {
- this(id, jobName, userName, submitTime, jobConfPath,
- new HashMap<JobACL, AccessControlList>());
- }
-
- /**
* Create an event to record job submission
* @param id The job Id of the job
* @param jobName Name of the job
@@ -59,10 +47,11 @@ public class JobSubmittedEvent implement
* @param submitTime Time of submission
* @param jobConfPath Path of the Job Configuration file
* @param jobACLs The configured acls for the job.
+ * @param jobQueueName The job-queue to which this job was submitted to
*/
public JobSubmittedEvent(JobID id, String jobName, String userName,
long submitTime, String jobConfPath,
- Map<JobACL, AccessControlList> jobACLs) {
+ Map<JobACL, AccessControlList> jobACLs, String jobQueueName) {
datum.jobid = new Utf8(id.toString());
datum.jobName = new Utf8(jobName);
datum.userName = new Utf8(userName);
@@ -74,6 +63,9 @@ public class JobSubmittedEvent implement
entry.getValue().getAclString()));
}
datum.acls = jobAcls;
+ if (jobQueueName != null) {
+ datum.jobQueueName = new Utf8(jobQueueName);
+ }
}
JobSubmittedEvent() {}
@@ -87,6 +79,13 @@ public class JobSubmittedEvent implement
public JobID getJobId() { return JobID.forName(datum.jobid.toString()); }
/** Get the Job name */
public String getJobName() { return datum.jobName.toString(); }
+ /** Get the Job queue name */
+ public String getJobQueueName() {
+ if (datum.jobQueueName != null) {
+ return datum.jobQueueName.toString();
+ }
+ return null;
+ }
/** Get the user name */
public String getUserName() { return datum.userName.toString(); }
/** Get the submit time */
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Fri Sep 17 07:34:39 2010
@@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.serve
import org.apache.hadoop.mapreduce.server.jobtracker.State;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenInfo;
@@ -108,8 +109,10 @@ public interface ClientProtocol extends
* Version 32: Added delegation tokens (add, renew, cancel)
* Version 33: Added JobACLs to JobStatus as part of MAPREDUCE-1307
* Version 34: Modified submitJob to use Credentials instead of TokenStorage.
+ * Version 35: Added the method getQueueAdmins(queueName) as part of
+ * MAPREDUCE-1664.
*/
- public static final long versionID = 34L;
+ public static final long versionID = 35L;
/**
* Allocate a name for the job.
@@ -144,6 +147,17 @@ public interface ClientProtocol extends
public long getTaskTrackerExpiryInterval() throws IOException,
InterruptedException;
+
+ /**
+ * Get the administrators of the given job-queue.
+ * This method is for hadoop internal use only.
+ * @param queueName
+ * @return Queue administrators ACL for the queue to which job is
+ * submitted to
+ * @throws IOException
+ */
+ public AccessControlList getQueueAdmins(String queueName) throws IOException;
+
/**
* Kill the indicated job
*/
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Fri Sep 17 07:34:39 2010
@@ -54,6 +54,9 @@ public class ConfigUtil {
new String[] {MRConfig.MAPMEMORY_MB});
Configuration.addDeprecation("mapred.cluster.reduce.memory.mb",
new String[] {MRConfig.REDUCEMEMORY_MB});
+ Configuration.addDeprecation("mapred.acls.enabled",
+ new String[] {MRConfig.MR_ACLS_ENABLED});
+
Configuration.addDeprecation("mapred.cluster.max.map.memory.mb",
new String[] {JTConfig.JT_MAX_MAPMEMORY_MB});
Configuration.addDeprecation("mapred.cluster.max.reduce.memory.mb",
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java Fri Sep 17 07:34:39 2010
@@ -22,7 +22,6 @@ package org.apache.hadoop.mapred;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.QueueState;
@@ -31,15 +30,6 @@ import org.apache.hadoop.mapreduce.serve
import org.apache.hadoop.security.UserGroupInformation;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import static org.apache.hadoop.mapred.Queue.*;
-import static org.apache.hadoop.mapred.QueueConfigurationParser.*;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.CONFIG;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createAcls;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createProperties;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueue;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueuesNode;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createState;
-
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.TransformerException;
import javax.xml.transform.Transformer;
@@ -53,13 +43,18 @@ import java.util.Properties;
import java.util.Set;
import java.io.File;
import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
//@Private
public class QueueManagerTestUtils {
- final static String CONFIG = new File("test-mapred-queues.xml")
- .getAbsolutePath();
+ /**
+ * Queue-configuration file for tests that start a cluster and wish to modify
+ * the queue configuration. This file is always in the unit tests classpath,
+ * so QueueManager started through JobTracker will automatically pick this up.
+ */
+ public static final String QUEUES_CONFIG_FILE_PATH = new File(System
+ .getProperty("test.build.extraconf", "build/test/extraconf"),
+ QueueManager.QUEUE_CONF_FILE_NAME).getAbsolutePath();
+
private static final Log LOG = LogFactory.getLog(QueueManagerTestUtils.class);
/**
@@ -76,7 +71,7 @@ public class QueueManagerTestUtils {
}
public static void createSimpleDocument(Document doc) throws Exception {
- Element queues = createQueuesNode(doc, "true");
+ Element queues = createQueuesNode(doc);
// Create parent level queue q1.
Element q1 = createQueue(doc, "q1");
@@ -106,8 +101,8 @@ public class QueueManagerTestUtils {
queues.appendChild(p1);
}
- static void createSimpleDocumentWithAcls(Document doc, String aclsEnabled) {
- Element queues = createQueuesNode(doc, aclsEnabled);
+ static void createSimpleDocumentWithAcls(Document doc) {
+ Element queues = createQueuesNode(doc);
// Create parent level queue q1.
Element q1 = createQueue(doc, "q1");
@@ -150,8 +145,56 @@ public class QueueManagerTestUtils {
queues.appendChild(p1);
}
+ /**
+ * Creates all given queues as 1st level queues(no nesting)
+ * @param doc the queues config document
+ * @param queueNames the queues to be added to the queues config document
+ * @param submitAcls acl-submit-job acls for each of the queues
+ * @param adminsAcls acl-administer-jobs acls for each of the queues
+ * @throws Exception
+ */
+ public static void createSimpleDocument(Document doc, String[] queueNames,
+ String[] submitAcls, String[] adminsAcls) throws Exception {
+
+ Element queues = createQueuesNode(doc);
+
+ // Create all queues as 1st level queues(no nesting)
+ for (int i = 0; i < queueNames.length; i++) {
+ Element q = createQueue(doc, queueNames[i]);
+
+ q.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
+ q.appendChild(createAcls(doc,
+ QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, submitAcls[i]));
+ q.appendChild(createAcls(doc,
+ QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, adminsAcls[i]));
+ queues.appendChild(q);
+ }
+ }
+
+ /**
+ * Creates queues configuration file with given queues at 1st level(i.e.
+ * no nesting of queues) and with the given queue acls.
+ * @param queueNames queue names which are to be configured
+ * @param submitAclStrings acl-submit-job acls for each of the queues
+ * @param adminsAclStrings acl-administer-jobs acls for each of the queues
+ * @return Configuration the queues configuration
+ * @throws Exception
+ */
+ public static void createQueuesConfigFile(String[] queueNames,
+ String[] submitAclStrings, String[] adminsAclStrings)
+ throws Exception {
+ if (queueNames.length > submitAclStrings.length ||
+ queueNames.length > adminsAclStrings.length) {
+ LOG.error("Number of queues is more than acls given.");
+ return;
+ }
+ Document doc = createDocument();
+ createSimpleDocument(doc, queueNames, submitAclStrings, adminsAclStrings);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ }
+
public static void refreshSimpleDocument(Document doc) throws Exception {
- Element queues = createQueuesNode(doc, "true");
+ Element queues = createQueuesNode(doc);
// Create parent level queue q1.
Element q1 = createQueue(doc, "q1");
@@ -189,10 +232,9 @@ public class QueueManagerTestUtils {
* @param enable
* @return the created element.
*/
- public static Element createQueuesNode(Document doc, String enable) {
+ public static Element createQueuesNode(Document doc) {
Element queues = doc.createElement("queues");
doc.appendChild(queues);
- queues.setAttribute("aclsEnabled", enable);
return queues;
}
@@ -240,9 +282,12 @@ public class QueueManagerTestUtils {
return propsElement;
}
- public static void checkForConfigFile() {
- if (new File(CONFIG).exists()) {
- new File(CONFIG).delete();
+ /**
+ * Delete queues configuration file if exists
+ */
+ public static void deleteQueuesConfigFile() {
+ if (new File(QUEUES_CONFIG_FILE_PATH).exists()) {
+ new File(QUEUES_CONFIG_FILE_PATH).delete();
}
}
@@ -257,7 +302,7 @@ public class QueueManagerTestUtils {
public static void writeQueueConfigurationFile(String filePath,
JobQueueInfo[] rootQueues) throws Exception {
Document doc = createDocument();
- Element queueElements = createQueuesNode(doc, String.valueOf(true));
+ Element queueElements = createQueuesNode(doc);
for (JobQueueInfo rootQ : rootQueues) {
queueElements.appendChild(QueueConfigurationParser.getQueueElement(doc,
rootQ));
@@ -265,27 +310,6 @@ public class QueueManagerTestUtils {
writeToFile(doc, filePath);
}
- static class QueueManagerConfigurationClassLoader extends ClassLoader {
- @Override
- public URL getResource(String name) {
- if (!name.equals(QueueManager.QUEUE_CONF_FILE_NAME)) {
- return super.getResource(name);
- } else {
- File resourceFile = new File(CONFIG);
- if (!resourceFile.exists()) {
- throw new IllegalStateException(
- "Queue Manager configuration file not found");
- }
- try {
- return resourceFile.toURL();
- } catch (MalformedURLException e) {
- LOG.fatal("Unable to form URL for the resource file : ");
- }
- return super.getResource(name);
- }
- }
- }
-
static Job submitSleepJob(final int numMappers, final int numReducers, final long mapSleepTime,
final long reduceSleepTime, boolean shouldComplete, String userInfo,
String queueName, Configuration clientConf) throws IOException,
@@ -329,12 +353,4 @@ public class QueueManagerTestUtils {
}
static MiniMRCluster miniMRCluster;
-
- static void setUpCluster(Configuration conf) throws IOException {
- JobConf jobConf = new JobConf(conf);
- String namenode = "file:///";
- Thread.currentThread().setContextClassLoader(
- new QueueManagerTestUtils.QueueManagerConfigurationClassLoader());
- miniMRCluster = new MiniMRCluster(0, namenode, 3, null, null, jobConf);
- }
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Fri Sep 17 07:34:39 2010
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
@@ -589,7 +590,7 @@ public class TestJobHistory extends Test
// Also JobACLs should be correct
if (mr.getJobTrackerRunner().getJobTracker()
- .isJobLevelAuthorizationEnabled()) {
+ .areACLsEnabled()) {
AccessControlList acl = new AccessControlList(
conf.get(JobACL.VIEW_JOB.getAclName(), " "));
assertTrue("VIEW_JOB ACL is not properly logged to history file.",
@@ -601,6 +602,9 @@ public class TestJobHistory extends Test
acl.toString().equals(
jobInfo.getJobACLs().get(JobACL.MODIFY_JOB).toString()));
}
+
+ // Validate the job queue name
+ assertTrue(jobInfo.getJobQueueName().equals(conf.getQueueName()));
}
public void testDoneFolderOnHDFS() throws IOException, InterruptedException {
@@ -726,10 +730,11 @@ public class TestJobHistory extends Test
/** Run a job that will be succeeded and validate its history file format
* and its content.
*/
- public void testJobHistoryFile() throws IOException {
+ public void testJobHistoryFile() throws Exception {
MiniMRCluster mr = null;
try {
JobConf conf = new JobConf();
+
// keep for less time
conf.setLong("mapred.jobtracker.retirejob.check", 1000);
conf.setLong("mapred.jobtracker.retirejob.interval", 1000);
@@ -739,7 +744,7 @@ public class TestJobHistory extends Test
conf.set(JTConfig.JT_JOBHISTORY_COMPLETED_LOCATION, doneFolder);
// Enable ACLs so that they are logged to history
- conf.setBoolean(MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, true);
+ conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
mr = new MiniMRCluster(2, "file:///", 3, null, null, conf);
@@ -966,7 +971,8 @@ public class TestJobHistory extends Test
Map<JobACL, AccessControlList> jobACLs =
new HashMap<JobACL, AccessControlList>();
JobSubmittedEvent jse =
- new JobSubmittedEvent(jobId, "job", "user", 12345, "path", jobACLs);
+ new JobSubmittedEvent(jobId, "job", "user", 12345, "path", jobACLs,
+ "default");
jh.logEvent(jse, jobId);
jh.closeWriter(jobId);
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java Fri Sep 17 07:34:39 2010
@@ -63,6 +63,7 @@ public class TestJobHistoryParsing exte
"~!@#$%^&*()_+QWERTYUIOP{}|ASDFGHJKL:\"'ZXCVBNM<>?" +
"\t\b\n\f\"\n in it";
+ String weirdJobQueueName = "my\njob\nQueue\\";
conf.setUser(username);
MiniMRCluster mr = null;
@@ -84,7 +85,7 @@ public class TestJobHistoryParsing exte
jobACLs.put(JobACL.MODIFY_JOB, modifyJobACL);
JobSubmittedEvent jse =
new JobSubmittedEvent(jobId, weirdJob, username, 12345, weirdPath,
- jobACLs);
+ jobACLs, weirdJobQueueName);
jh.logEvent(jse, jobId);
JobFinishedEvent jfe =
@@ -121,6 +122,7 @@ public class TestJobHistoryParsing exte
assertTrue (jobInfo.getUsername().equals(username));
assertTrue(jobInfo.getJobname().equals(weirdJob));
+ assertTrue(jobInfo.getJobQueueName().equals(weirdJobQueueName));
assertTrue(jobInfo.getJobConfPath().equals(weirdPath));
Map<JobACL, AccessControlList> parsedACLs = jobInfo.getJobACLs();
assertEquals(2, parsedACLs.size());
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java Fri Sep 17 07:34:39 2010
@@ -17,31 +17,31 @@
*/
package org.apache.hadoop.mapred;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.CONFIG;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.checkForConfigFile;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.QUEUES_CONFIG_FILE_PATH;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.createDocument;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.createSimpleDocumentWithAcls;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.miniMRCluster;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.setUpCluster;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.deleteQueuesConfigFile;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.writeToFile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import java.io.File;
-import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.QueueInfo;
+import org.junit.After;
import org.junit.Test;
import org.w3c.dom.Document;
public class TestJobQueueClient {
+
+ @After
+ public void tearDown() throws Exception {
+ deleteQueuesConfigFile();
+ }
+
@Test
public void testQueueOrdering() throws Exception {
// create some sample queues in a hierarchy..
@@ -91,13 +91,15 @@ public class TestJobQueueClient {
@Test
public void testGetQueue() throws Exception {
- checkForConfigFile();
+
+ deleteQueuesConfigFile();
Document doc = createDocument();
- createSimpleDocumentWithAcls(doc, "true");
- writeToFile(doc, CONFIG);
- Configuration conf = new Configuration();
- conf.addResource(CONFIG);
- setUpCluster(conf);
+ createSimpleDocumentWithAcls(doc);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ JobConf jobConf = new JobConf();
+ String namenode = "file:///";
+ miniMRCluster = new MiniMRCluster(0, namenode, 3, null, null, jobConf);
+
JobClient jc = new JobClient(miniMRCluster.createJobConf());
// test for existing queue
QueueInfo queueInfo = jc.getQueueInfo("q1");
@@ -105,7 +107,5 @@ public class TestJobQueueClient {
// try getting a non-existing queue
queueInfo = jc.getQueueInfo("queue");
assertNull(queueInfo);
-
- new File(CONFIG).delete();
}
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Fri Sep 17 07:34:39 2010
@@ -26,7 +26,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
-import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.security.UserGroupInformation;
@@ -202,6 +201,13 @@ public class TestLocalizationWithLinuxTa
File jobLogDir = TaskLog.getJobDir(jobId);
checkFilePermissions(jobLogDir.toString(), expectedDirPerms, task.getUser(),
ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+ // check job-acls.xml file permissions
+ checkFilePermissions(jobLogDir.toString() + Path.SEPARATOR
+ + TaskTracker.jobACLsFile, expectedFilePerms, task.getUser(),
+ ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+
+ // validate the content of job ACLs file
+ validateJobACLsFileContent();
}
@Override
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java Fri Sep 17 07:34:39 2010
@@ -18,8 +18,11 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
-import javax.security.auth.login.LoginException;
import junit.framework.TestCase;
+
+import org.apache.hadoop.mapreduce.MRConfig;
+
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
import org.apache.hadoop.security.UserGroupInformation;
/**
@@ -31,58 +34,38 @@ public class TestQueueAclsForCurrentUser
private QueueManager queueManager;
private JobConf conf = null;
UserGroupInformation currentUGI = null;
- String submitAcl = Queue.QueueOperation.SUBMIT_JOB.getAclName();
- String adminAcl = Queue.QueueOperation.ADMINISTER_JOBS.getAclName();
+ String submitAcl = QueueACL.SUBMIT_JOB.getAclName();
+ String adminAcl = QueueACL.ADMINISTER_JOBS.getAclName();
+
+ @Override
+ protected void tearDown() {
+ deleteQueuesConfigFile();
+ }
- private void setupConfForNoAccess() throws IOException,LoginException {
+ // No access for queues for the user currentUGI
+ private void setupConfForNoAccess() throws Exception {
currentUGI = UserGroupInformation.getLoginUser();
String userName = currentUGI.getUserName();
- conf = new JobConf();
- conf.setBoolean("mapred.acls.enabled",true);
+ String[] queueNames = {"qu1", "qu2"};
+ // Only user u1 has access for queue qu1
+ // Only group g2 has acls for the queue qu2
+ createQueuesConfigFile(
+ queueNames, new String[]{"u1", " g2"}, new String[]{"u1", " g2"});
- conf.set("mapred.queue.names", "qu1,qu2");
- //Only user u1 has access
- conf.set("mapred.queue.qu1.acl-submit-job", "u1");
- conf.set("mapred.queue.qu1.acl-administer-jobs", "u1");
- //q2 only group g2 has acls for the queues
- conf.set("mapred.queue.qu2.acl-submit-job", " g2");
- conf.set("mapred.queue.qu2.acl-administer-jobs", " g2");
- queueManager = new QueueManager(conf);
+ conf = new JobConf();
+ conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+ queueManager = new QueueManager(conf);
}
/**
* sets up configuration for acls test.
* @return
*/
- private void setupConf(boolean aclSwitch) throws IOException,LoginException{
+ private void setupConf(boolean aclSwitch) throws Exception{
currentUGI = UserGroupInformation.getLoginUser();
String userName = currentUGI.getUserName();
- conf = new JobConf();
-
- conf.setBoolean("mapred.acls.enabled", aclSwitch);
-
- conf.set("mapred.queue.names", "qu1,qu2,qu3,qu4,qu5,qu6,qu7");
- //q1 Has acls for all the users, supports both submit and administer
- conf.set("mapred.queue.qu1.acl-submit-job", "*");
- conf.set("mapred.queue.qu1-acl-administer-jobs", "*");
- //q2 only u2 has acls for the queues
- conf.set("mapred.queue.qu2.acl-submit-job", "u2");
- conf.set("mapred.queue.qu2.acl-administer-jobs", "u2");
- //q3 Only u2 has submit operation access rest all have administer access
- conf.set("mapred.queue.qu3.acl-submit-job", "u2");
- conf.set("mapred.queue.qu3.acl-administer-jobs", "*");
- //q4 Only u2 has administer access , anyone can do submit
- conf.set("mapred.queue.qu4.acl-submit-job", "*");
- conf.set("mapred.queue.qu4.acl-administer-jobs", "u2");
- //qu6 only current user has submit access
- conf.set("mapred.queue.qu6.acl-submit-job",userName);
- conf.set("mapred.queue.qu6.acl-administrator-jobs","u2");
- //qu7 only current user has administrator access
- conf.set("mapred.queue.qu7.acl-submit-job","u2");
- conf.set("mapred.queue.qu7.acl-administrator-jobs",userName);
- //qu8 only current group has access
StringBuilder groupNames = new StringBuilder("");
String[] ugiGroupNames = currentUGI.getGroupNames();
int max = ugiGroupNames.length-1;
@@ -92,22 +75,38 @@ public class TestQueueAclsForCurrentUser
groupNames.append(",");
}
}
- conf.set("mapred.queue.qu5.acl-submit-job"," "+groupNames.toString());
- conf.set("mapred.queue.qu5.acl-administrator-jobs"," "
- +groupNames.toString());
+ String groupsAcl = " " + groupNames.toString();
+
+ //q1 Has acls for all the users, supports both submit and administer
+ //q2 only u2 has acls for the queues
+ //q3 Only u2 has submit operation access rest all have administer access
+ //q4 Only u2 has administer access , anyone can do submit
+ //qu5 only current user's groups has access
+ //qu6 only current user has submit access
+ //qu7 only current user has administrator access
+ String[] queueNames =
+ {"qu1", "qu2", "qu3", "qu4", "qu5", "qu6", "qu7"};
+ String[] submitAcls =
+ {"*", "u2", "u2", "*", groupsAcl, userName, "u2"};
+ String[] adminsAcls =
+ {"*", "u2", "*", "u2", groupsAcl, "u2", userName};
+ createQueuesConfigFile(queueNames, submitAcls, adminsAcls);
+
+ conf = new JobConf();
+ conf.setBoolean(MRConfig.MR_ACLS_ENABLED, aclSwitch);
queueManager = new QueueManager(conf);
}
- public void testQueueAclsForCurrentuser() throws IOException,LoginException {
+ public void testQueueAclsForCurrentuser() throws Exception {
setupConf(true);
QueueAclsInfo[] queueAclsInfoList =
queueManager.getQueueAcls(currentUGI);
checkQueueAclsInfo(queueAclsInfoList);
}
- public void testQueueAclsForCurrentUserAclsDisabled() throws IOException,
- LoginException {
+ // Acls are disabled on the mapreduce cluster
+ public void testQueueAclsForCurrentUserAclsDisabled() throws Exception {
setupConf(false);
//fetch the acls info for current user.
QueueAclsInfo[] queueAclsInfoList = queueManager.
@@ -115,7 +114,7 @@ public class TestQueueAclsForCurrentUser
checkQueueAclsInfo(queueAclsInfoList);
}
- public void testQueueAclsForNoAccess() throws IOException,LoginException {
+ public void testQueueAclsForNoAccess() throws Exception {
setupConfForNoAccess();
QueueAclsInfo[] queueAclsInfoList = queueManager.
getQueueAcls(currentUGI);
@@ -124,7 +123,7 @@ public class TestQueueAclsForCurrentUser
private void checkQueueAclsInfo(QueueAclsInfo[] queueAclsInfoList)
throws IOException {
- if (conf.get("mapred.acls.enabled").equalsIgnoreCase("true")) {
+ if (conf.get(MRConfig.MR_ACLS_ENABLED).equalsIgnoreCase("true")) {
for (int i = 0; i < queueAclsInfoList.length; i++) {
QueueAclsInfo acls = queueAclsInfoList[i];
String queueName = acls.getQueueName();
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java Fri Sep 17 07:34:39 2010
@@ -22,8 +22,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
import static org.apache.hadoop.mapred.QueueConfigurationParser.*;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import static org.junit.Assert.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.QueueState;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jackson.map.ObjectMapper;
@@ -31,7 +34,6 @@ import org.junit.After;
import org.junit.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import java.io.File;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
@@ -47,26 +49,32 @@ public class TestQueueManager {
@After
public void tearDown() throws Exception {
- new File(CONFIG).delete();
+ deleteQueuesConfigFile();
+ }
+
+ // create UGI with the given user name and the fixed group name "myGroup"
+ private UserGroupInformation createUGI(String userName) {
+ return UserGroupInformation.createUserForTesting(
+ userName, new String[]{"myGroup"});
}
@Test
public void testDefault() throws Exception {
+ deleteQueuesConfigFile();
QueueManager qm = new QueueManager();
Queue root = qm.getRoot();
assertEquals(root.getChildren().size(), 1);
assertEquals(root.getChildren().iterator().next().getName(), "default");
- assertFalse(qm.isAclsEnabled());
assertNull(root.getChildren().iterator().next().getChildren());
}
@Test
public void testXMLParsing() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
- writeToFile(doc, CONFIG);
- QueueManager qm = new QueueManager(CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
Set<Queue> rootQueues = qm.getRoot().getChildren();
List<String> names = new ArrayList<String>();
for (Queue q : rootQueues) {
@@ -101,62 +109,63 @@ public class TestQueueManager {
assertTrue(
q.getAcls().get(
- QueueManager.toFullPropertyName(
+ toFullPropertyName(
q.getName(), ACL_SUBMIT_JOB_TAG)).isUserAllowed(
- UserGroupInformation.createRemoteUser("u1")));
+ createUGI("u1")));
assertTrue(
q.getAcls().get(
- QueueManager.toFullPropertyName(
+ toFullPropertyName(
q.getName(),
ACL_ADMINISTER_JOB_TAG))
- .isUserAllowed(UserGroupInformation.createRemoteUser("u2")));
+ .isUserAllowed(createUGI("u2")));
assertTrue(q.getState().equals(QueueState.STOPPED));
}
@Test
public void testhasAccess() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
- createSimpleDocumentWithAcls(doc,"true");
- writeToFile(doc, CONFIG);
- QueueManager qm = new QueueManager(CONFIG);
+ createSimpleDocumentWithAcls(doc);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
UserGroupInformation ugi;
// test for acls access when acls are set with *
- ugi = UserGroupInformation.createRemoteUser("u1");
+ ugi = createUGI("u1");
assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p12",
- Queue.QueueOperation.SUBMIT_JOB, ugi));
- ugi = UserGroupInformation.createRemoteUser("u2");
+ QueueACL.SUBMIT_JOB, ugi));
+ ugi = createUGI("u2");
assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p12",
- Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+ QueueACL.ADMINISTER_JOBS, ugi));
// test for acls access when acls are not set with *
- ugi = UserGroupInformation.createRemoteUser("u1");
+ ugi = createUGI("u1");
assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p11",
- Queue.QueueOperation.SUBMIT_JOB, ugi));
- ugi = UserGroupInformation.createRemoteUser("u2");
+ QueueACL.SUBMIT_JOB, ugi));
+ ugi = createUGI("u2");
assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p11",
- Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+ QueueACL.ADMINISTER_JOBS, ugi));
- // test for acls access when acls are not specified but acls is enabled
- ugi = UserGroupInformation.createRemoteUser("u1");
- assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
- Queue.QueueOperation.SUBMIT_JOB, ugi));
- ugi = UserGroupInformation.createRemoteUser("u2");
- assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
- Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+ // Test for acls access when acls are not specified but acls are enabled.
+ // By default, the queue acls for any queue are empty.
+ ugi = createUGI("u1");
+ assertFalse(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
+ QueueACL.SUBMIT_JOB, ugi));
+ ugi = createUGI("u2");
+ assertFalse(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
+ QueueACL.ADMINISTER_JOBS, ugi));
assertTrue(qm.isRunning("p1" + NAME_SEPARATOR + "p13"));
}
@Test
public void testQueueView() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
- writeToFile(doc, CONFIG);
- QueueManager qm = new QueueManager(CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
for (Queue queue : qm.getRoot().getChildren()) {
checkHierarchy(queue, qm);
@@ -176,25 +185,21 @@ public class TestQueueManager {
@Test
public void testhasAccessForParent() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
- writeToFile(doc, CONFIG);
- QueueManager qm = new QueueManager(CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
- UserGroupInformation ugi =
- UserGroupInformation.createRemoteUser("u1");
- assertFalse(
- qm.hasAccess(
- "p1",
- Queue.QueueOperation.SUBMIT_JOB, ugi));
+ UserGroupInformation ugi = createUGI("u1");
+ assertFalse(qm.hasAccess("p1", QueueACL.SUBMIT_JOB, ugi));
}
@Test
public void testValidation() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
- Element queues = createQueuesNode(doc, "false");
+ Element queues = createQueuesNode(doc);
Element q1 = createQueue(doc, "q1");
q1.appendChild(createAcls(doc, "acl-submit-job", "u1"));
@@ -203,9 +208,9 @@ public class TestQueueManager {
q1.appendChild(createQueue(doc, "p16"));
queues.appendChild(q1);
- writeToFile(doc, CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
try {
- new QueueManager(CONFIG);
+ new QueueManager(QUEUES_CONFIG_FILE_PATH, false);
fail("Should throw an exception as configuration is wrong ");
} catch (RuntimeException re) {
LOG.info(re.getMessage());
@@ -214,27 +219,27 @@ public class TestQueueManager {
@Test
public void testInvalidName() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
- Element queues = createQueuesNode(doc, "false");
+ Element queues = createQueuesNode(doc);
Element q1 = createQueue(doc, "");
queues.appendChild(q1);
- writeToFile(doc, CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
try {
- new QueueManager(CONFIG);
+ new QueueManager(QUEUES_CONFIG_FILE_PATH, false);
fail("Should throw an exception as configuration is wrong ");
} catch (Exception re) {
re.printStackTrace();
LOG.info(re.getMessage());
}
- checkForConfigFile();
+ deleteQueuesConfigFile();
doc = createDocument();
- queues = createQueuesNode(doc, "false");
+ queues = createQueuesNode(doc);
q1 = doc.createElement("queue");
queues.appendChild(q1);
- writeToFile(doc, CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
try {
- new QueueManager(CONFIG);
+ new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
fail("Should throw an exception as configuration is wrong ");
} catch (RuntimeException re) {
re.printStackTrace();
@@ -244,10 +249,10 @@ public class TestQueueManager {
@Test
public void testMissingConfigFile() throws Exception {
- checkForConfigFile(); // deletes file
+ deleteQueuesConfigFile(); // deletes file
try {
- new QueueManager(CONFIG);
+ new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
fail("Should throw an exception for missing file when " +
"explicitly passed.");
} catch (RuntimeException re) {
@@ -266,9 +271,9 @@ public class TestQueueManager {
@Test
public void testEmptyProperties() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
- Element queues = createQueuesNode(doc, "false");
+ Element queues = createQueuesNode(doc);
Element q1 = createQueue(doc, "q1");
Element p = createProperties(doc, null);
q1.appendChild(p);
@@ -277,11 +282,11 @@ public class TestQueueManager {
@Test
public void testEmptyFile() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
- writeToFile(doc, CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
try {
- new QueueManager(CONFIG);
+ new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
fail("Should throw an exception as configuration is wrong ");
} catch (Exception re) {
re.printStackTrace();
@@ -291,11 +296,11 @@ public class TestQueueManager {
@Test
public void testJobQueueInfoGeneration() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
- writeToFile(doc, CONFIG);
- QueueManager qm = new QueueManager(CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
List<JobQueueInfo> rootQueues =
qm.getRoot().getJobQueueInfo().getChildren();
@@ -338,11 +343,11 @@ public class TestQueueManager {
*/
@Test
public void testRefresh() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
- writeToFile(doc, CONFIG);
- QueueManager qm = new QueueManager(CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
Queue beforeRefreshRoot = qm.getRoot();
//remove the file and create new one.
Set<Queue> rootQueues = beforeRefreshRoot.getChildren();
@@ -360,16 +365,16 @@ public class TestQueueManager {
"p1" + NAME_SEPARATOR + "p12")) {
assertTrue(
child.getAcls().get(
- QueueManager.toFullPropertyName(
+ toFullPropertyName(
child.getName(), ACL_SUBMIT_JOB_TAG))
- .isUserAllowed(UserGroupInformation.createRemoteUser("u1")));
+ .isUserAllowed(createUGI("u1")));
assertTrue(
child.getAcls().get(
- QueueManager.toFullPropertyName(
+ toFullPropertyName(
child.getName(),
ACL_ADMINISTER_JOB_TAG))
- .isUserAllowed(UserGroupInformation.createRemoteUser("u2")));
+ .isUserAllowed(createUGI("u2")));
assertTrue(child.getState().equals(QueueState.STOPPED));
} else {
assertTrue(child.getState().equals(QueueState.RUNNING));
@@ -377,11 +382,11 @@ public class TestQueueManager {
}
}
}
- checkForConfigFile();
+ deleteQueuesConfigFile();
doc = createDocument();
refreshSimpleDocument(doc);
- writeToFile(doc, CONFIG);
- QueueConfigurationParser cp = new QueueConfigurationParser(CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ QueueConfigurationParser cp = new QueueConfigurationParser(QUEUES_CONFIG_FILE_PATH, true);
qm.getRoot().isHierarchySameAs(cp.getRoot());
qm.setQueues(
cp.getRoot().getChildren().toArray(
@@ -403,17 +408,17 @@ public class TestQueueManager {
"p1" + NAME_SEPARATOR + "p12")) {
assertTrue(
child.getAcls().get(
- QueueManager.toFullPropertyName(
+ toFullPropertyName(
child.getName(),
ACL_SUBMIT_JOB_TAG))
- .isUserAllowed(UserGroupInformation.createRemoteUser("u3")));
+ .isUserAllowed(createUGI("u3")));
assertTrue(
child.getAcls().get(
- QueueManager.toFullPropertyName(
+ toFullPropertyName(
child.getName(),
ACL_ADMINISTER_JOB_TAG))
- .isUserAllowed(UserGroupInformation.createRemoteUser("u4")));
+ .isUserAllowed(createUGI("u4")));
assertTrue(child.getState().equals(QueueState.RUNNING));
} else {
assertTrue(child.getState().equals(QueueState.STOPPED));
@@ -425,20 +430,20 @@ public class TestQueueManager {
@Test
public void testRefreshWithInvalidFile() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
- writeToFile(doc, CONFIG);
- QueueManager qm = new QueueManager(CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+ QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, false);
- checkForConfigFile();
+ deleteQueuesConfigFile();
doc = createDocument();
- Element queues = createQueuesNode(doc, "false");
+ Element queues = createQueuesNode(doc);
Element q1 = createQueue(doc, "");
queues.appendChild(q1);
- writeToFile(doc, CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
try {
- QueueConfigurationParser cp = new QueueConfigurationParser(CONFIG);
+ QueueConfigurationParser cp = new QueueConfigurationParser(QUEUES_CONFIG_FILE_PATH, false);
fail("Should throw an exception as configuration is wrong ");
} catch (Throwable re) {
@@ -548,19 +553,21 @@ public class TestQueueManager {
*/
@Test
public void testDumpConfiguration() throws Exception {
- checkForConfigFile();
+ deleteQueuesConfigFile();
Document doc = createDocument();
createSimpleDocument(doc);
- writeToFile(doc, CONFIG);
+ writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+
StringWriter out = new StringWriter();
- QueueManager.dumpConfiguration(out,CONFIG,null);
+ Configuration conf = new Configuration(false);
+ conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+ QueueManager.dumpConfiguration(out, QUEUES_CONFIG_FILE_PATH, conf);
+
ObjectMapper mapper = new ObjectMapper();
// parse the Json dump
JsonQueueTree queueTree =
mapper.readValue(out.toString(), JsonQueueTree.class);
-
- // check if acls_enabled is correct
- assertEquals(true, queueTree.isAcls_enabled());
+
// check for the number of top-level queues
assertEquals(2, queueTree.getQueues().length);