You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2010/09/17 09:34:40 UTC

svn commit: r998003 [2/3] - in /hadoop/mapreduce/trunk: ./ conf/ src/c++/task-controller/ src/c++/task-controller/tests/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/mumak/src/test/org/apache/hadoop/mapred/ src/docs/src...

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java Fri Sep 17 07:34:39 2010
@@ -20,7 +20,7 @@ package org.apache.hadoop.mapred;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.mapred.Queue.QueueOperation;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.QueueState;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
@@ -72,11 +72,17 @@ class QueueConfigurationParser {
   static final String QUEUE_TAG = "queue";
   static final String ACL_SUBMIT_JOB_TAG = "acl-submit-job";
   static final String ACL_ADMINISTER_JOB_TAG = "acl-administer-jobs";
+
+  // The value read from queues config file for this tag is not used at all.
+  // To enable queue acls and job acls, mapreduce.cluster.acls.enabled is
+  // to be set in mapred-site.xml
+  @Deprecated
+  static final String ACLS_ENABLED_TAG = "aclsEnabled";
+
   static final String PROPERTIES_TAG = "properties";
   static final String STATE_TAG = "state";
   static final String QUEUE_NAME_TAG = "name";
   static final String QUEUES_TAG = "queues";
-  static final String ACLS_ENABLED_TAG = "aclsEnabled";
   static final String PROPERTY_TAG = "property";
   static final String KEY_TAG = "key";
   static final String VALUE_TAG = "value";
@@ -88,7 +94,8 @@ class QueueConfigurationParser {
     
   }
 
-  QueueConfigurationParser(String confFile) {
+  QueueConfigurationParser(String confFile, boolean areAclsEnabled) {
+    aclsEnabled = areAclsEnabled;
     File file = new File(confFile).getAbsoluteFile();
     if (!file.exists()) {
       throw new RuntimeException("Configuration file not found at " +
@@ -105,7 +112,8 @@ class QueueConfigurationParser {
     }
   }
 
-  QueueConfigurationParser(InputStream xmlInput) {
+  QueueConfigurationParser(InputStream xmlInput, boolean areAclsEnabled) {
+    aclsEnabled = areAclsEnabled;
     loadFrom(xmlInput);
   }
 
@@ -184,8 +192,14 @@ class QueueConfigurationParser {
       NamedNodeMap nmp = queuesNode.getAttributes();
       Node acls = nmp.getNamedItem(ACLS_ENABLED_TAG);
 
-      if (acls != null && acls.getTextContent().equalsIgnoreCase("true")) {
-        setAclsEnabled(true);
+      if (acls != null) {
+        LOG.warn("Configuring " + ACLS_ENABLED_TAG + " flag in " +
+            QueueManager.QUEUE_CONF_FILE_NAME + " is not valid. " +
+            "This tag is ignored. Configure " +
+            MRConfig.MR_ACLS_ENABLED + " in mapred-site.xml. See the " +
+            " documentation of " + MRConfig.MR_ACLS_ENABLED +
+            ", which is used for enabling job level authorization and " +
+            " queue level authorization.");
       }
 
       NodeList props = queuesNode.getChildNodes();
@@ -269,9 +283,9 @@ class QueueConfigurationParser {
         name += nameValue;
         newQueue.setName(name);
         submitKey = toFullPropertyName(name,
-            Queue.QueueOperation.SUBMIT_JOB.getAclName());
+            QueueACL.SUBMIT_JOB.getAclName());
         adminKey = toFullPropertyName(name,
-            Queue.QueueOperation.ADMINISTER_JOBS.getAclName());
+            QueueACL.ADMINISTER_JOBS.getAclName());
       }
 
       if (QUEUE_TAG.equals(field.getTagName()) && field.hasChildNodes()) {
@@ -299,11 +313,11 @@ class QueueConfigurationParser {
     }
     
     if (!acls.containsKey(submitKey)) {
-      acls.put(submitKey, new AccessControlList("*"));
+      acls.put(submitKey, new AccessControlList(" "));
     }
     
     if (!acls.containsKey(adminKey)) {
-      acls.put(adminKey, new AccessControlList("*"));
+      acls.put(adminKey, new AccessControlList(" "));
     }
     
     //Set acls

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java Fri Sep 17 07:34:39 2010
@@ -20,11 +20,11 @@ package org.apache.hadoop.mapred;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.mapred.AuditLogger.Constants;
 import org.apache.hadoop.mapred.TaskScheduler.QueueRefresher;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.QueueState;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
@@ -39,7 +39,6 @@ import java.io.IOException;
 import java.io.Writer;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.List;
@@ -82,24 +81,25 @@ import java.net.URL;
  * in mapred-site.xml. However, when configured in the latter, there is
  * no support for hierarchical queues.
  */
-
-class QueueManager {
+@InterfaceAudience.Private
+public class QueueManager {
 
   private static final Log LOG = LogFactory.getLog(QueueManager.class);
 
   // Map of a queue name and Queue object
   private Map<String, Queue> leafQueues = new HashMap<String,Queue>();
   private Map<String, Queue> allQueues = new HashMap<String, Queue>();
-  static final String QUEUE_CONF_FILE_NAME = "mapred-queues.xml";
+  public static final String QUEUE_CONF_FILE_NAME = "mapred-queues.xml";
   static final String QUEUE_CONF_DEFAULT_FILE_NAME = "mapred-queues-default.xml";
 
-  // Prefix in configuration for queue related keys
-  static final String QUEUE_CONF_PROPERTY_NAME_PREFIX
-    = "mapred.queue.";
-  //Resource in which queue acls are configured.
+  //Prefix in configuration for queue related keys
+  static final String QUEUE_CONF_PROPERTY_NAME_PREFIX = "mapred.queue.";
 
+  //Resource in which queue acls are configured.
   private Queue root = null;
-  private boolean isAclEnabled = false;
+  
+  // represents if job and queue acls are enabled on the mapreduce cluster
+  private boolean areAclsEnabled = false;
 
   /**
    * Factory method to create an appropriate instance of a queue
@@ -117,7 +117,7 @@ class QueueManager {
    * @return Queue configuration parser
    */
   static QueueConfigurationParser getQueueConfigurationParser(
-    Configuration conf, boolean reloadConf) {
+    Configuration conf, boolean reloadConf, boolean areAclsEnabled) {
     if (conf != null && conf.get(
       DeprecatedQueueConfigurationParser.MAPRED_QUEUE_NAMES_KEY) != null) {
       if (reloadConf) {
@@ -136,7 +136,8 @@ class QueueManager {
       InputStream stream = null;
       try {
         stream = xmlInUrl.openStream();
-        return new QueueConfigurationParser(new BufferedInputStream(stream));
+        return new QueueConfigurationParser(new BufferedInputStream(stream),
+            areAclsEnabled);
       } catch (IOException ioe) {
         throw new RuntimeException("Couldn't open queue configuration at " +
                                    xmlInUrl, ioe);
@@ -146,8 +147,13 @@ class QueueManager {
     }
   }
 
-  public QueueManager() {
-    initialize(getQueueConfigurationParser(null, false));
+  QueueManager() {// acls are disabled
+    this(false);
+  }
+
+  QueueManager(boolean areAclsEnabled) {
+    this.areAclsEnabled = areAclsEnabled;
+    initialize(getQueueConfigurationParser(null, false, areAclsEnabled));
   }
 
   /**
@@ -159,10 +165,11 @@ class QueueManager {
    * is found in mapred-site.xml, it will then look for site configuration
    * in mapred-queues.xml supporting hierarchical queues.
    *
-   * @param conf Configuration object where queue configuration is specified.
+   * @param clusterConf    mapreduce cluster configuration
    */
-  public QueueManager(Configuration conf) {
-    initialize(getQueueConfigurationParser(conf, false));
+  public QueueManager(Configuration clusterConf) {
+    areAclsEnabled = clusterConf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
+    initialize(getQueueConfigurationParser(clusterConf, false, areAclsEnabled));
   }
 
   /**
@@ -174,8 +181,10 @@ class QueueManager {
    *
    * @param confFile File where the queue configuration is found.
    */
-  QueueManager(String confFile) {
-    QueueConfigurationParser cp = new QueueConfigurationParser(confFile);
+  QueueManager(String confFile, boolean areAclsEnabled) {
+    this.areAclsEnabled = areAclsEnabled;
+    QueueConfigurationParser cp =
+        new QueueConfigurationParser(confFile, areAclsEnabled);
     initialize(cp);
   }
 
@@ -196,10 +205,8 @@ class QueueManager {
     allQueues.putAll(leafQueues);
 
     LOG.info("AllQueues : " + allQueues + "; LeafQueues : " + leafQueues);
-    this.isAclEnabled = cp.isAclsEnabled();
   }
 
-
   /**
    * Return the set of leaf level queues configured in the system to
    * which jobs are submitted.
@@ -215,52 +222,22 @@ class QueueManager {
   }
 
   /**
-   * Return true if the given {@link Queue.QueueOperation} can be
-   * performed by the specified user on the given queue.
+   * Return true if the given user is part of the ACL for the given
+   * {@link QueueACL} name for the given queue.
    * <p/>
    * An operation is allowed if all users are provided access for this
    * operation, or if either the user or any of the groups specified is
    * provided access.
    *
    * @param queueName Queue on which the operation needs to be performed.
-   * @param oper      The operation to perform
+   * @param qACL      The queue ACL name to be checked
    * @param ugi       The user and groups who wish to perform the operation.
-   * @return true if the operation is allowed, false otherwise.
+   * @return true     if the operation is allowed, false otherwise.
    */
   public synchronized boolean hasAccess(
-    String queueName,
-    Queue.QueueOperation oper,
-    UserGroupInformation ugi) {
-    return hasAccess(queueName, null, oper, ugi);
-  }
+    String queueName, QueueACL qACL, UserGroupInformation ugi) {
 
-  /**
-   * Return true if the given {@link Queue.QueueOperation} can be
-   * performed by the specified user on the specified job in the given queue.
-   * <p/>
-   * An operation is allowed either if the owner of the job is the user
-   * performing the task, all users are provided access for this
-   * operation, or if either the user or any of the groups specified is
-   * provided access.
-   * <p/>
-   * If the {@link Queue.QueueOperation} is not job specific then the
-   * job parameter is ignored.
-   *
-   * @param queueName Queue on which the operation needs to be performed.
-   * @param job       The {@link JobInProgress} on which the operation is being
-   *                  performed.
-   * @param oper      The operation to perform
-   * @param ugi       The user and groups who wish to perform the operation.
-   * @return true if the operation is allowed, false otherwise.
-   */
-  public synchronized boolean hasAccess(
-    String queueName, JobInProgress job,
-    Queue.QueueOperation oper,
-    UserGroupInformation ugi) {
-    
     Queue q = leafQueues.get(queueName);
-    String user = ugi.getShortUserName();
-    String jobId = job == null ? "-" : job.getJobID().toString();
 
     if (q == null) {
       LOG.info("Queue " + queueName + " is not present");
@@ -272,50 +249,23 @@ class QueueManager {
       return false;
     }
 
-    if (!isAclsEnabled()) {
+    if (!areAclsEnabled()) {
       return true;
     }
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug(
-        "checking access for : "
-          + QueueManager.toFullPropertyName(queueName, oper.getAclName()));
-    }
-
-    if (oper.isJobOwnerAllowed()) {
-      if (job != null
-        && job.getJobConf().getUser().equals(ugi.getShortUserName())) {
-        AuditLogger.logSuccess(user, oper.name(), queueName);
-        return true;
-      }
+      LOG.debug("Checking access for the acl " + toFullPropertyName(queueName,
+        qACL.getAclName()) + " for user " + ugi.getShortUserName());
     }
 
     AccessControlList acl = q.getAcls().get(
-      toFullPropertyName(
-        queueName,
-        oper.getAclName()));
+        toFullPropertyName(queueName, qACL.getAclName()));
     if (acl == null) {
-      AuditLogger.logFailure(user, oper.name(), null, queueName,
-                             "Disabled queue ACLs, job : " + jobId);
       return false;
     }
 
-    // Check the ACL list
-    boolean allowed = acl.isAllAllowed();
-    if (!allowed) {
-      // Check the allowed users list
-      if (acl.isUserAllowed(ugi)) {
-        allowed = true;
-      }
-    }
-    if (allowed) {
-      AuditLogger.logSuccess(user, oper.name(), queueName);
-    } else {
-      AuditLogger.logFailure(user, oper.name(), null, queueName,
-                             Constants.UNAUTHORIZED_USER + ", job : " + jobId);
-    }
-
-    return allowed;
+    // Check if user is part of the ACL
+    return acl.isUserAllowed(ugi);
   }
 
   /**
@@ -392,7 +342,7 @@ class QueueManager {
 
     // Create a new configuration parser using the passed conf object.
     QueueConfigurationParser cp =
-        QueueManager.getQueueConfigurationParser(conf, true);
+        getQueueConfigurationParser(conf, true, areAclsEnabled);
 
     /*
      * (1) Validate the refresh of properties owned by QueueManager. As of now,
@@ -441,7 +391,8 @@ class QueueManager {
     LOG.info("Queue configuration is refreshed successfully.");
   }
 
-  static final String toFullPropertyName(
+  // this method is for internal use only
+  public static final String toFullPropertyName(
     String queue,
     String property) {
     return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
@@ -512,16 +463,16 @@ class QueueManager {
     //List of all QueueAclsInfo objects , this list is returned
     ArrayList<QueueAclsInfo> queueAclsInfolist =
       new ArrayList<QueueAclsInfo>();
-    Queue.QueueOperation[] operations = Queue.QueueOperation.values();
+    QueueACL[] qAcls = QueueACL.values();
     for (String queueName : leafQueues.keySet()) {
       QueueAclsInfo queueAclsInfo = null;
       ArrayList<String> operationsAllowed = null;
-      for (Queue.QueueOperation operation : operations) {
-        if (hasAccess(queueName, operation, ugi)) {
+      for (QueueACL qAcl : qAcls) {
+        if (hasAccess(queueName, qAcl, ugi)) {
           if (operationsAllowed == null) {
             operationsAllowed = new ArrayList<String>();
           }
-          operationsAllowed.add(operation.getAclName());
+          operationsAllowed.add(qAcl.getAclName());
         }
       }
       if (operationsAllowed != null) {
@@ -534,8 +485,7 @@ class QueueManager {
       }
     }
     return queueAclsInfolist.toArray(
-      new QueueAclsInfo[
-        queueAclsInfolist.size()]);
+      new QueueAclsInfo[queueAclsInfolist.size()]);
   }
 
   /**
@@ -611,8 +561,8 @@ class QueueManager {
    *
    * @return true if ACLs are enabled.
    */
-  boolean isAclsEnabled() {
-    return isAclEnabled;
+  boolean areAclsEnabled() {
+    return areAclsEnabled;
   }
 
   /**
@@ -623,7 +573,30 @@ class QueueManager {
   Queue getRoot() {
     return root;
   }
-  
+
+  /**
+   * Returns the specific queue ACL for the given queue.
+   * Returns null if the given queue does not exist or the acl is not
+   * configured for that queue.
+   * If acls are disabled(mapreduce.cluster.acls.enabled set to false), returns
+   * ACL with all users.
+   */
+  synchronized AccessControlList getQueueACL(String queueName,
+      QueueACL qACL) {
+    if (areAclsEnabled) {
+      Queue q = leafQueues.get(queueName);
+      if (q != null) {
+        return q.getAcls().get(toFullPropertyName(
+          queueName, qACL.getAclName()));
+      }
+      else {
+        LOG.warn("Queue " + queueName + " is not present.");
+        return null;
+      }
+    }
+    return new AccessControlList("*");
+  }
+
   /**
    * Dumps the configuration of hierarchy of queues
    * @param out the writer object to which dump is written
@@ -646,17 +619,21 @@ class QueueManager {
         MAPRED_QUEUE_NAMES_KEY) != null) {
       return;
     }
+    
     JsonFactory dumpFactory = new JsonFactory();
     JsonGenerator dumpGenerator = dumpFactory.createJsonGenerator(out);
     QueueConfigurationParser parser;
+    boolean aclsEnabled = false;
+    if (conf != null) {
+      aclsEnabled = conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
+    }
     if (configFile != null && !"".equals(configFile)) {
-      parser = new QueueConfigurationParser(configFile);
+      parser = new QueueConfigurationParser(configFile, aclsEnabled);
     }
     else {
-      parser = QueueManager.getQueueConfigurationParser(null, false);
+      parser = getQueueConfigurationParser(null, false, aclsEnabled);
     }
     dumpGenerator.writeStartObject();
-    dumpGenerator.writeBooleanField("acls_enabled", parser.isAclsEnabled());
     dumpGenerator.writeFieldName("queues");
     dumpGenerator.writeStartArray();
     dumpConfiguration(dumpGenerator,parser.getRoot().getChildren());
@@ -684,11 +661,11 @@ class QueueManager {
       AccessControlList administerJobsList = null;
       if (queue.getAcls() != null) {
         submitJobList =
-          queue.getAcls().get(QueueManager.toFullPropertyName(queue.getName(),
-              Queue.QueueOperation.SUBMIT_JOB.getAclName()));
+          queue.getAcls().get(toFullPropertyName(queue.getName(),
+              QueueACL.SUBMIT_JOB.getAclName()));
         administerJobsList =
-          queue.getAcls().get(QueueManager.toFullPropertyName(queue.getName(),
-              Queue.QueueOperation.ADMINISTER_JOBS.getAclName()));
+          queue.getAcls().get(toFullPropertyName(queue.getName(),
+              QueueACL.ADMINISTER_JOBS.getAclName()));
       }
       String aclsSubmitJobValue = " ";
       if (submitJobList != null ) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java Fri Sep 17 07:34:39 2010
@@ -30,9 +30,9 @@ import javax.servlet.http.HttpServletRes
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobACL;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
@@ -121,42 +121,54 @@ public class TaskLogServlet extends Http
    * users and groups specified in configuration using
    * mapreduce.job.acl-view-job to view job.
    */
-  private void checkAccessForTaskLogs(JobConf conf, String user, JobID jobId,
+  private void checkAccessForTaskLogs(JobConf conf, String user, String jobId,
       TaskTracker tracker) throws AccessControlException {
 
-    if (!tracker.isJobLevelAuthorizationEnabled()) {
+    if (!tracker.areACLsEnabled()) {
       return;
     }
 
-    // buiild job view acl by reading from conf
+    // buiild job view ACL by reading from conf
     AccessControlList jobViewACL = tracker.getJobACLsManager().
         constructJobACLs(conf).get(JobACL.VIEW_JOB);
 
+    // read job queue name from conf
+    String queue = conf.getQueueName();
+
+    // build queue admins ACL by reading from conf
+    AccessControlList queueAdminsACL = new AccessControlList(
+        conf.get(toFullPropertyName(queue,
+            QueueACL.ADMINISTER_JOBS.getAclName()), " "));
+
     String jobOwner = conf.get(JobContext.USER_NAME);
     UserGroupInformation callerUGI =
         UserGroupInformation.createRemoteUser(user);
 
-    tracker.getJobACLsManager().checkAccess(jobId, callerUGI, JobACL.VIEW_JOB,
-        jobOwner, jobViewACL);
+    // check if user is queue admin or cluster admin or jobOwner or member of
+    // job-view-acl
+    if (!queueAdminsACL.isUserAllowed(callerUGI)) {
+      tracker.getACLsManager().checkAccess(jobId, callerUGI, queue,
+          Operation.VIEW_TASK_LOGS, jobOwner, jobViewACL);
+    }
   }
 
   /**
-   * Builds a Configuration object by reading the xml file.
+   * Builds a JobConf object by reading the job-acls.xml file.
    * This doesn't load the default resources.
    *
-   * Returns null if job-acls.xml is not there in userlogs/$jobid/attempt-dir on
+   * Returns null if job-acls.xml is not there in userlogs/$jobid on
    * local file system. This can happen when we restart the cluster with job
    * level authorization enabled(but was disabled on earlier cluster) and
    * viewing task logs of old jobs(i.e. jobs finished on earlier unsecure
    * cluster).
    */
-  static Configuration getConfFromJobACLsFile(TaskAttemptID attemptId,
-      boolean isCleanup) {
+  static JobConf getConfFromJobACLsFile(JobID jobId) {
     Path jobAclsFilePath = new Path(
-        TaskLog.getAttemptDir(attemptId, isCleanup).toString(), TaskRunner.jobACLsFile);
-    Configuration conf = null;
+        TaskLog.getJobDir(jobId).toString(),
+        TaskTracker.jobACLsFile);
+    JobConf conf = null;
     if (new File(jobAclsFilePath.toUri().getPath()).exists()) {
-      conf = new Configuration(false);
+      conf = new JobConf(false);
       conf.addResource(jobAclsFilePath);
     }
     return conf;
@@ -228,15 +240,15 @@ public class TaskLogServlet extends Http
       ServletContext context = getServletContext();
       TaskTracker taskTracker = (TaskTracker) context.getAttribute(
           "task.tracker");
+      JobID jobId = attemptId.getJobID();
+
       // get jobACLConf from ACLs file
-      Configuration jobACLConf = getConfFromJobACLsFile(attemptId, isCleanup);
+      JobConf jobACLConf = getConfFromJobACLsFile(jobId);
       // Ignore authorization if job-acls.xml is not found
       if (jobACLConf != null) {
-        JobID jobId = attemptId.getJobID();
-
         try {
-          checkAccessForTaskLogs(new JobConf(jobACLConf), user, jobId,
-              taskTracker);
+          checkAccessForTaskLogs(jobACLConf, user,
+              jobId.toString(), taskTracker);
         } catch (AccessControlException e) {
           String errMsg = "User " + user + " failed to view tasklogs of job " +
               jobId + "!\n\n" + e.getMessage();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Fri Sep 17 07:34:39 2010
@@ -19,7 +19,6 @@ package org.apache.hadoop.mapred;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
@@ -34,7 +33,6 @@ import java.util.Vector;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@@ -77,8 +75,6 @@ abstract class TaskRunner extends Thread
   protected JobConf conf;
   JvmManager jvmManager;
 
-  static String jobACLsFile = "job-acl.xml";
-
   public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker, 
       JobConf conf) {
     this.tip = tip;
@@ -286,32 +282,8 @@ abstract class TaskRunner extends Thread
       Localizer.PermissionsHandler.setPermissions(logDir,
           Localizer.PermissionsHandler.sevenZeroZero);
     }
-    // write job acls into a file to know the access for task logs
-    writeJobACLs(logDir);
-    return logFiles;
-  }
 
-  // Writes job-view-acls and user name into an xml file
-  private void writeJobACLs(File logDir) throws IOException {
-    File aclFile = new File(logDir, TaskRunner.jobACLsFile);
-    Configuration aclConf = new Configuration(false);
-
-    // set the job view acls in aclConf
-    String jobViewACLs = conf.get(MRJobConfig.JOB_ACL_VIEW_JOB);
-    if (jobViewACLs != null) {
-      aclConf.set(MRJobConfig.JOB_ACL_VIEW_JOB, jobViewACLs);
-    }
-    // set jobOwner as mapreduce.job.user.name in aclConf
-    String jobOwner = conf.getUser();
-    aclConf.set(MRJobConfig.USER_NAME, jobOwner);
-    FileOutputStream out = new FileOutputStream(aclFile);
-    try {
-      aclConf.writeXml(out);
-    } finally {
-      out.close();
-    }
-    Localizer.PermissionsHandler.setPermissions(aclFile,
-        Localizer.PermissionsHandler.sevenZeroZero);
+    return logFiles;
   }
 
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Sep 17 07:34:39 2010
@@ -20,6 +20,7 @@
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
@@ -78,6 +79,7 @@ import org.apache.hadoop.mapred.TaskTrac
 import org.apache.hadoop.mapred.pipes.Submitter;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
@@ -160,6 +162,11 @@ public class TaskTracker 
   public static final Log ClientTraceLog =
     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
 
+  // Job ACLs file is created by TaskTracker under userlogs/$jobid directory for
+  // each job at job localization time. This will be used by TaskLogServlet for
+  // authorizing viewing of task logs of that job
+  static String jobACLsFile = "job-acls.xml";
+
   volatile boolean running = true;
 
   private LocalDirAllocator localDirAllocator;
@@ -249,9 +256,7 @@ public class TaskTracker 
   private int maxReduceSlots;
   private int failures;
 
-  // MROwner's ugi
-  private UserGroupInformation mrOwner;
-  private String supergroup;
+  private ACLsManager aclsManager;
   
   // Performance-related config knob to send an out-of-band heartbeat
   // on task completion
@@ -275,9 +280,6 @@ public class TaskTracker 
   private long reservedPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
   private ResourceCalculatorPlugin resourceCalculatorPlugin = null;
 
-  // Manages job acls of jobs in TaskTracker
-  private TaskTrackerJobACLsManager jobACLsManager;
-
   /**
    * the minimum interval between jobtracker polls
    */
@@ -590,13 +592,11 @@ public class TaskTracker 
    * close().
    */
   synchronized void initialize() throws IOException, InterruptedException {
-    UserGroupInformation.setConfiguration(fConf);
-    SecurityUtil.login(fConf, TTConfig.TT_KEYTAB_FILE, TTConfig.TT_USER_NAME);
-    mrOwner = UserGroupInformation.getCurrentUser();
 
-    supergroup = fConf.get(MRConfig.MR_SUPERGROUP, "supergroup");
-    LOG.info("Starting tasktracker with owner as " + mrOwner.getShortUserName()
-             + " and supergroup as " + supergroup);
+    aclsManager = new ACLsManager(fConf, new JobACLsManager(fConf), null);
+    LOG.info("Starting tasktracker with owner as " +
+        getMROwner().getShortUserName() + " and supergroup as " +
+        getSuperGroup());
 
     localFs = FileSystem.getLocal(fConf);
     // use configured nameserver & interface to get local hostname
@@ -687,7 +687,8 @@ public class TaskTracker 
     this.distributedCacheManager.startCleanupThread();
 
     this.jobClient = (InterTrackerProtocol) 
-    mrOwner.doAs(new PrivilegedExceptionAction<Object>() {
+    UserGroupInformation.getLoginUser().doAs(
+        new PrivilegedExceptionAction<Object>() {
       public Object run() throws IOException {
         return RPC.waitForProxy(InterTrackerProtocol.class,
             InterTrackerProtocol.versionID, 
@@ -735,19 +736,22 @@ public class TaskTracker 
   }
 
   UserGroupInformation getMROwner() {
-    return mrOwner;
+    return aclsManager.getMROwner();
   }
 
   String getSuperGroup() {
-    return supergroup;
+    return aclsManager.getSuperGroup();
   }
-  
+
+  boolean isMRAdmin(UserGroupInformation ugi) {
+    return aclsManager.isMRAdmin(ugi);
+  }
+
   /**
-   * Is job level authorization enabled on the TT ?
+   * Are ACLs for authorization checks enabled on the MR cluster ?
    */
-  boolean isJobLevelAuthorizationEnabled() {
-    return fConf.getBoolean(
-        MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, false);
+  boolean areACLsEnabled() {
+    return fConf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
   }
 
   public static Class<?>[] getInstrumentationClasses(Configuration conf) {
@@ -998,7 +1002,7 @@ public class TaskTracker 
        
         JobConf localJobConf = localizeJobFiles(t, rjob);
         // initialize job log directory
-        initializeJobLogDir(jobId);
+        initializeJobLogDir(jobId, localJobConf);
 
         // Now initialize the job via task-controller so as to set
         // ownership/permissions of jars, job-work-dir. Note that initializeJob
@@ -1098,12 +1102,64 @@ public class TaskTracker 
     return localJobConf;
   }
 
-  // create job userlog dir
-  void initializeJobLogDir(JobID jobId) {
+  // Create job userlog dir.
+  // Create job acls file in job log dir, if needed.
+  void initializeJobLogDir(JobID jobId, JobConf localJobConf)
+      throws IOException {
     // remove it from tasklog cleanup thread first,
     // it might be added there because of tasktracker reinit or restart
     taskLogCleanupThread.unmarkJobFromLogDeletion(jobId);
     localizer.initializeJobLogDir(jobId);
+
+    if (areACLsEnabled()) {
+      // Create job-acls.xml file in job userlog dir and write the needed
+      // info for authorization of users for viewing task logs of this job.
+      writeJobACLs(localJobConf, TaskLog.getJobDir(jobId));
+    }
+  }
+
+  /**
+   *  Creates job-acls.xml under the given directory logDir and writes
+   *  job-view-acl, queue-admins-acl, jobOwner name and queue name into this
+   *  file.
+   *  queue name is the queue to which the job was submitted to.
+   *  queue-admins-acl is the queue admins ACL of the queue to which this
+   *  job was submitted to.
+   * @param conf   job configuration
+   * @param logDir job userlog dir
+   * @throws IOException
+   */
+  private static void writeJobACLs(JobConf conf, File logDir)
+      throws IOException {
+    File aclFile = new File(logDir, jobACLsFile);
+    JobConf aclConf = new JobConf(false);
+
+    // set the job view acl in aclConf
+    String jobViewACL = conf.get(MRJobConfig.JOB_ACL_VIEW_JOB, " ");
+    aclConf.set(MRJobConfig.JOB_ACL_VIEW_JOB, jobViewACL);
+
+    // set the job queue name in aclConf
+    String queue = conf.getQueueName();
+    aclConf.setQueueName(queue);
+
+    // set the queue admins acl in aclConf
+    String qACLName = toFullPropertyName(queue,
+        QueueACL.ADMINISTER_JOBS.getAclName());
+    String queueAdminsACL = conf.get(qACLName, " ");
+    aclConf.set(qACLName, queueAdminsACL);
+
+    // set jobOwner as user.name in aclConf
+    String jobOwner = conf.getUser();
+    aclConf.set("user.name", jobOwner);
+
+    FileOutputStream out = new FileOutputStream(aclFile);
+    try {
+      aclConf.writeXml(out);
+    } finally {
+      out.close();
+    }
+    Localizer.PermissionsHandler.setPermissions(aclFile,
+        Localizer.PermissionsHandler.sevenZeroZero);
   }
 
   /**
@@ -1318,8 +1374,10 @@ public class TaskTracker 
     checkJettyPort(httpPort);
     // create task log cleanup thread
     setTaskLogCleanupThread(new UserLogCleaner(fConf));
-    // Initialize the jobACLSManager
-    jobACLsManager = new TaskTrackerJobACLsManager(this);
+
+    UserGroupInformation.setConfiguration(fConf);
+    SecurityUtil.login(fConf, TTConfig.TT_KEYTAB_FILE, TTConfig.TT_USER_NAME);
+
     initialize();
   }
 
@@ -4043,7 +4101,11 @@ public class TaskTracker 
       return localJobTokenFileStr;
     }
 
-    TaskTrackerJobACLsManager getJobACLsManager() {
-      return jobACLsManager;
+    JobACLsManager getJobACLsManager() {
+      return aclsManager.getJobACLsManager();
+    }
+    
+    ACLsManager getACLsManager() {
+      return aclsManager;
     }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java Fri Sep 17 07:34:39 2010
@@ -41,12 +41,15 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.QueueACL;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.codehaus.jackson.JsonParseException;
@@ -355,6 +358,14 @@ class JobSubmitter {
       conf.setInt("mapred.map.tasks", maps);
       LOG.info("number of splits:" + maps);
 
+      // write "queue admins of the queue to which job is being submitted"
+      // to job file.
+      String queue = conf.get(MRJobConfig.QUEUE_NAME,
+          JobConf.DEFAULT_QUEUE_NAME);
+      AccessControlList acl = submitClient.getQueueAdmins(queue);
+      conf.set(toFullPropertyName(queue,
+          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
+
       // Write job file to submit dir
       writeConf(conf, submitJobFile);
       

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java Fri Sep 17 07:34:39 2010
@@ -37,8 +37,7 @@ public interface MRConfig {
   public static final String MAPMEMORY_MB = "mapreduce.cluster.mapmemory.mb";
   public static final String REDUCEMEMORY_MB = 
     "mapreduce.cluster.reducememory.mb";
-  public static final String JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG = 
-    "mapreduce.cluster.job-authorization-enabled";
+  public static final String MR_ACLS_ENABLED = "mapreduce.cluster.acls.enabled";
   public static final String MR_SUPERGROUP =
     "mapreduce.cluster.permissions.supergroup";
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueInfo.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueInfo.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueInfo.java Fri Sep 17 07:34:39 2010
@@ -60,7 +60,7 @@ public class QueueInfo implements Writab
    */
   public QueueInfo() {
     // make it running by default.
-    this.queueState = queueState.RUNNING;
+    this.queueState = QueueState.RUNNING;
     children = new ArrayList<QueueInfo>();
     props = new Properties();
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr Fri Sep 17 07:34:39 2010
@@ -77,8 +77,9 @@
           {"name": "jobConfPath", "type": "string"},
           {"name": "acls", "type": {"type": "map",
                                     "values": "string"
-                                    }
-          }
+                                   }
+          },
+          {"name": "jobQueueName", "type": "string"}
       ]
      },
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Fri Sep 17 07:34:39 2010
@@ -314,6 +314,7 @@ public class JobHistoryParser {
     info.submitTime = event.getSubmitTime();
     info.jobConfPath = event.getJobConfPath();
     info.jobACLs = event.getJobAcls();
+    info.jobQueueName = event.getJobQueueName();
   }
 
   /**
@@ -325,6 +326,7 @@ public class JobHistoryParser {
     JobID jobid;
     String username;
     String jobname;
+    String jobQueueName;
     String jobConfPath;
     long launchTime;
     int totalMaps;
@@ -349,7 +351,7 @@ public class JobHistoryParser {
       submitTime = launchTime = finishTime = -1;
       totalMaps = totalReduces = failedMaps = failedReduces = 0;
       finishedMaps = finishedReduces = 0;
-      username = jobname = jobConfPath = "";
+      username = jobname = jobConfPath = jobQueueName = "";
       tasksMap = new HashMap<TaskID, TaskInfo>();
       jobACLs = new HashMap<JobACL, AccessControlList>();
     }
@@ -358,6 +360,7 @@ public class JobHistoryParser {
     public void printAll() {
       System.out.println("JOBNAME: " + jobname);
       System.out.println("USERNAME: " + username);
+      System.out.println("JOB_QUEUE_NAME: " + jobQueueName);
       System.out.println("SUBMIT_TIME" + submitTime);
       System.out.println("LAUNCH_TIME: " + launchTime);
       System.out.println("JOB_STATUS: " + jobStatus);
@@ -383,6 +386,8 @@ public class JobHistoryParser {
     public String getUsername() { return username; }
     /** Get the job name */
     public String getJobname() { return jobname; }
+    /** Get the job queue name */
+    public String getJobQueueName() { return jobQueueName; }
     /** Get the path for the job configuration file */
     public String getJobConfPath() { return jobConfPath; }
     /** Get the job launch time */

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java Fri Sep 17 07:34:39 2010
@@ -40,18 +40,6 @@ public class JobSubmittedEvent implement
   private JobSubmitted datum = new JobSubmitted();
 
   /**
-   * @deprecated Use
-   *             {@link #JobSubmittedEvent(JobID, String, String, long, String, Map)}
-   *             instead.
-   */
-  @Deprecated
-  public JobSubmittedEvent(JobID id, String jobName, String userName,
-      long submitTime, String jobConfPath) {
-    this(id, jobName, userName, submitTime, jobConfPath,
-        new HashMap<JobACL, AccessControlList>());
-  }
-
-  /**
    * Create an event to record job submission
    * @param id The job Id of the job
    * @param jobName Name of the job
@@ -59,10 +47,11 @@ public class JobSubmittedEvent implement
    * @param submitTime Time of submission
    * @param jobConfPath Path of the Job Configuration file
    * @param jobACLs The configured acls for the job.
+   * @param jobQueueName The job-queue to which this job was submitted to
    */
   public JobSubmittedEvent(JobID id, String jobName, String userName,
       long submitTime, String jobConfPath,
-      Map<JobACL, AccessControlList> jobACLs) {
+      Map<JobACL, AccessControlList> jobACLs, String jobQueueName) {
     datum.jobid = new Utf8(id.toString());
     datum.jobName = new Utf8(jobName);
     datum.userName = new Utf8(userName);
@@ -74,6 +63,9 @@ public class JobSubmittedEvent implement
           entry.getValue().getAclString()));
     }
     datum.acls = jobAcls;
+    if (jobQueueName != null) {
+      datum.jobQueueName = new Utf8(jobQueueName);
+    }
   }
 
   JobSubmittedEvent() {}
@@ -87,6 +79,13 @@ public class JobSubmittedEvent implement
   public JobID getJobId() { return JobID.forName(datum.jobid.toString()); }
   /** Get the Job name */
   public String getJobName() { return datum.jobName.toString(); }
+  /** Get the Job queue name */
+  public String getJobQueueName() {
+    if (datum.jobQueueName != null) {
+      return datum.jobQueueName.toString();
+    }
+    return null;
+  }
   /** Get the user name */
   public String getUserName() { return datum.userName.toString(); }
   /** Get the submit time */

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Fri Sep 17 07:34:39 2010
@@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.serve
 import org.apache.hadoop.mapreduce.server.jobtracker.State;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenInfo;
 
@@ -108,8 +109,10 @@ public interface ClientProtocol extends 
    * Version 32: Added delegation tokens (add, renew, cancel)
    * Version 33: Added JobACLs to JobStatus as part of MAPREDUCE-1307
    * Version 34: Modified submitJob to use Credentials instead of TokenStorage.
+   * Version 35: Added the method getQueueAdmins(queueName) as part of
+   *             MAPREDUCE-1664.
    */
-  public static final long versionID = 34L;
+  public static final long versionID = 35L;
 
   /**
    * Allocate a name for the job.
@@ -144,6 +147,17 @@ public interface ClientProtocol extends 
   
   public long getTaskTrackerExpiryInterval() throws IOException,
                                                InterruptedException;
+  
+  /**
+   * Get the administrators of the given job-queue.
+   * This method is for hadoop internal use only.
+   * @param queueName
+   * @return Queue administrators ACL for the queue to which job is
+   *         submitted to
+   * @throws IOException
+   */
+  public AccessControlList getQueueAdmins(String queueName) throws IOException;
+
   /**
    * Kill the indicated job
    */

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Fri Sep 17 07:34:39 2010
@@ -54,6 +54,9 @@ public class ConfigUtil {
       new String[] {MRConfig.MAPMEMORY_MB});
     Configuration.addDeprecation("mapred.cluster.reduce.memory.mb", 
       new String[] {MRConfig.REDUCEMEMORY_MB});
+    Configuration.addDeprecation("mapred.acls.enabled", 
+        new String[] {MRConfig.MR_ACLS_ENABLED});
+
     Configuration.addDeprecation("mapred.cluster.max.map.memory.mb", 
       new String[] {JTConfig.JT_MAX_MAPMEMORY_MB});
     Configuration.addDeprecation("mapred.cluster.max.reduce.memory.mb", 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java Fri Sep 17 07:34:39 2010
@@ -22,7 +22,6 @@ package org.apache.hadoop.mapred;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.QueueState;
@@ -31,15 +30,6 @@ import org.apache.hadoop.mapreduce.serve
 import org.apache.hadoop.security.UserGroupInformation;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
-import static org.apache.hadoop.mapred.Queue.*;
-import static org.apache.hadoop.mapred.QueueConfigurationParser.*;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.CONFIG;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createAcls;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createProperties;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueue;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueuesNode;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.createState;
-
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.transform.TransformerException;
 import javax.xml.transform.Transformer;
@@ -53,13 +43,18 @@ import java.util.Properties;
 import java.util.Set;
 import java.io.File;
 import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
 
 //@Private
 public class QueueManagerTestUtils {
-  final static String CONFIG = new File("test-mapred-queues.xml")
-      .getAbsolutePath();
+  /**
+   * Queue-configuration file for tests that start a cluster and wish to modify
+   * the queue configuration. This file is always in the unit tests classpath,
+   * so QueueManager started through JobTracker will automatically pick this up.
+   */
+  public static final String QUEUES_CONFIG_FILE_PATH = new File(System
+      .getProperty("test.build.extraconf", "build/test/extraconf"),
+      QueueManager.QUEUE_CONF_FILE_NAME).getAbsolutePath();
+
   private static final Log LOG = LogFactory.getLog(QueueManagerTestUtils.class);
 
   /**
@@ -76,7 +71,7 @@ public class QueueManagerTestUtils {
   }
 
   public static void createSimpleDocument(Document doc) throws Exception {
-    Element queues = createQueuesNode(doc, "true");
+    Element queues = createQueuesNode(doc);
 
     // Create parent level queue q1.
     Element q1 = createQueue(doc, "q1");
@@ -106,8 +101,8 @@ public class QueueManagerTestUtils {
     queues.appendChild(p1);
   }
 
-  static void createSimpleDocumentWithAcls(Document doc, String aclsEnabled) {
-    Element queues = createQueuesNode(doc, aclsEnabled);
+  static void createSimpleDocumentWithAcls(Document doc) {
+    Element queues = createQueuesNode(doc);
 
     // Create parent level queue q1.
     Element q1 = createQueue(doc, "q1");
@@ -150,8 +145,56 @@ public class QueueManagerTestUtils {
     queues.appendChild(p1);
   }
 
+  /**
+   * Creates all given queues as 1st level queues(no nesting)
+   * @param doc         the queues config document
+   * @param queueNames  the queues to be added to the queues config document
+   * @param submitAcls  acl-submit-job acls for each of the queues
+   * @param adminsAcls  acl-administer-jobs acls for each of the queues
+   * @throws Exception
+   */
+  public static void createSimpleDocument(Document doc, String[] queueNames,
+      String[] submitAcls, String[] adminsAcls) throws Exception {
+
+    Element queues = createQueuesNode(doc);
+
+    // Create all queues as 1st level queues(no nesting)
+    for (int i = 0; i < queueNames.length; i++) {
+      Element q = createQueue(doc, queueNames[i]);
+
+      q.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
+      q.appendChild(createAcls(doc,
+          QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, submitAcls[i]));
+      q.appendChild(createAcls(doc,
+          QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, adminsAcls[i]));
+      queues.appendChild(q);
+    }
+  }
+
+  /**
+   * Creates queues configuration file with given queues at 1st level(i.e.
+   * no nesting of queues) and with the given queue acls.
+   * @param queueNames        queue names which are to be configured
+   * @param submitAclStrings  acl-submit-job acls for each of the queues
+   * @param adminsAclStrings  acl-administer-jobs acls for each of the queues
+   * @return Configuration    the queues configuration
+   * @throws Exception
+   */
+  public static void createQueuesConfigFile(String[] queueNames,
+      String[] submitAclStrings, String[] adminsAclStrings)
+  throws Exception {
+    if (queueNames.length > submitAclStrings.length ||
+        queueNames.length > adminsAclStrings.length) {
+      LOG.error("Number of queues is more than acls given.");
+      return;
+    }
+    Document doc = createDocument();
+    createSimpleDocument(doc, queueNames, submitAclStrings, adminsAclStrings);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+  }
+
   public static void refreshSimpleDocument(Document doc) throws Exception {
-    Element queues = createQueuesNode(doc, "true");
+    Element queues = createQueuesNode(doc);
 
     // Create parent level queue q1.
     Element q1 = createQueue(doc, "q1");
@@ -189,10 +232,9 @@ public class QueueManagerTestUtils {
    * @param enable
    * @return the created element.
    */
-  public static Element createQueuesNode(Document doc, String enable) {
+  public static Element createQueuesNode(Document doc) {
     Element queues = doc.createElement("queues");
     doc.appendChild(queues);
-    queues.setAttribute("aclsEnabled", enable);
     return queues;
   }
 
@@ -240,9 +282,12 @@ public class QueueManagerTestUtils {
     return propsElement;
   }
 
-  public static void checkForConfigFile() {
-    if (new File(CONFIG).exists()) {
-      new File(CONFIG).delete();
+  /**
+   * Delete queues configuration file if exists
+   */
+  public static void deleteQueuesConfigFile() {
+    if (new File(QUEUES_CONFIG_FILE_PATH).exists()) {
+      new File(QUEUES_CONFIG_FILE_PATH).delete();
     }
   }
 
@@ -257,7 +302,7 @@ public class QueueManagerTestUtils {
   public static void writeQueueConfigurationFile(String filePath,
       JobQueueInfo[] rootQueues) throws Exception {
     Document doc = createDocument();
-    Element queueElements = createQueuesNode(doc, String.valueOf(true));
+    Element queueElements = createQueuesNode(doc);
     for (JobQueueInfo rootQ : rootQueues) {
       queueElements.appendChild(QueueConfigurationParser.getQueueElement(doc,
           rootQ));
@@ -265,27 +310,6 @@ public class QueueManagerTestUtils {
     writeToFile(doc, filePath);
   }
 
-  static class QueueManagerConfigurationClassLoader extends ClassLoader {
-    @Override
-    public URL getResource(String name) {
-      if (!name.equals(QueueManager.QUEUE_CONF_FILE_NAME)) {
-        return super.getResource(name);
-      } else {
-        File resourceFile = new File(CONFIG);
-        if (!resourceFile.exists()) {
-          throw new IllegalStateException(
-              "Queue Manager configuration file not found");
-        }
-        try {
-          return resourceFile.toURL();
-        } catch (MalformedURLException e) {
-          LOG.fatal("Unable to form URL for the resource file : ");
-        }
-        return super.getResource(name);
-      }
-    }
-  }
-
   static Job submitSleepJob(final int numMappers, final int numReducers, final long mapSleepTime,
       final long reduceSleepTime, boolean shouldComplete, String userInfo,
       String queueName, Configuration clientConf) throws IOException,
@@ -329,12 +353,4 @@ public class QueueManagerTestUtils {
   }
 
   static MiniMRCluster miniMRCluster;
-
-  static void setUpCluster(Configuration conf) throws IOException {
-    JobConf jobConf = new JobConf(conf);
-    String namenode = "file:///";
-    Thread.currentThread().setContextClassLoader(
-        new QueueManagerTestUtils.QueueManagerConfigurationClassLoader());
-    miniMRCluster = new MiniMRCluster(0, namenode, 3, null, null, jobConf);
-  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Fri Sep 17 07:34:39 2010
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobACL;
@@ -589,7 +590,7 @@ public class TestJobHistory extends Test
     
     // Also JobACLs should be correct
     if (mr.getJobTrackerRunner().getJobTracker()
-        .isJobLevelAuthorizationEnabled()) {
+        .areACLsEnabled()) {
       AccessControlList acl = new AccessControlList(
           conf.get(JobACL.VIEW_JOB.getAclName(), " "));
       assertTrue("VIEW_JOB ACL is not properly logged to history file.",
@@ -601,6 +602,9 @@ public class TestJobHistory extends Test
           acl.toString().equals(
           jobInfo.getJobACLs().get(JobACL.MODIFY_JOB).toString()));
     }
+    
+    // Validate the job queue name
+    assertTrue(jobInfo.getJobQueueName().equals(conf.getQueueName()));
   }
 
   public void testDoneFolderOnHDFS() throws IOException, InterruptedException {
@@ -726,10 +730,11 @@ public class TestJobHistory extends Test
   /** Run a job that will be succeeded and validate its history file format
    *  and its content.
    */
-  public void testJobHistoryFile() throws IOException {
+  public void testJobHistoryFile() throws Exception {
     MiniMRCluster mr = null;
     try {
       JobConf conf = new JobConf();
+
       // keep for less time
       conf.setLong("mapred.jobtracker.retirejob.check", 1000);
       conf.setLong("mapred.jobtracker.retirejob.interval", 1000);
@@ -739,7 +744,7 @@ public class TestJobHistory extends Test
       conf.set(JTConfig.JT_JOBHISTORY_COMPLETED_LOCATION, doneFolder);
 
       // Enable ACLs so that they are logged to history
-      conf.setBoolean(MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, true);
+      conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
 
       mr = new MiniMRCluster(2, "file:///", 3, null, null, conf);
 
@@ -966,7 +971,8 @@ public class TestJobHistory extends Test
       Map<JobACL, AccessControlList> jobACLs =
           new HashMap<JobACL, AccessControlList>();
       JobSubmittedEvent jse =
-        new JobSubmittedEvent(jobId, "job", "user", 12345, "path", jobACLs);
+        new JobSubmittedEvent(jobId, "job", "user", 12345, "path", jobACLs,
+        "default");
       jh.logEvent(jse, jobId);
       jh.closeWriter(jobId);
 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java Fri Sep 17 07:34:39 2010
@@ -63,6 +63,7 @@ public class TestJobHistoryParsing  exte
                     "~!@#$%^&*()_+QWERTYUIOP{}|ASDFGHJKL:\"'ZXCVBNM<>?" +
                     "\t\b\n\f\"\n in it";
 
+    String weirdJobQueueName = "my\njob\nQueue\\";
     conf.setUser(username);
 
     MiniMRCluster mr = null;
@@ -84,7 +85,7 @@ public class TestJobHistoryParsing  exte
     jobACLs.put(JobACL.MODIFY_JOB, modifyJobACL);
     JobSubmittedEvent jse =
         new JobSubmittedEvent(jobId, weirdJob, username, 12345, weirdPath,
-            jobACLs);
+            jobACLs, weirdJobQueueName);
     jh.logEvent(jse, jobId);
 
     JobFinishedEvent jfe =
@@ -121,6 +122,7 @@ public class TestJobHistoryParsing  exte
 
     assertTrue (jobInfo.getUsername().equals(username));
     assertTrue(jobInfo.getJobname().equals(weirdJob));
+    assertTrue(jobInfo.getJobQueueName().equals(weirdJobQueueName));
     assertTrue(jobInfo.getJobConfPath().equals(weirdPath));
     Map<JobACL, AccessControlList> parsedACLs = jobInfo.getJobACLs();
     assertEquals(2, parsedACLs.size());

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java Fri Sep 17 07:34:39 2010
@@ -17,31 +17,31 @@
  */
 package org.apache.hadoop.mapred;
 
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.CONFIG;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.checkForConfigFile;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.QUEUES_CONFIG_FILE_PATH;
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.createDocument;
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.createSimpleDocumentWithAcls;
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.miniMRCluster;
-import static org.apache.hadoop.mapred.QueueManagerTestUtils.setUpCluster;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.deleteQueuesConfigFile;
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.writeToFile;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
-import java.io.File;
-import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.QueueInfo;
+import org.junit.After;
 import org.junit.Test;
 import org.w3c.dom.Document;
 
 public class TestJobQueueClient {
+
+  @After
+  public void tearDown() throws Exception {
+    deleteQueuesConfigFile();
+  }
+
   @Test
   public void testQueueOrdering() throws Exception {
     // create some sample queues in a hierarchy..
@@ -91,13 +91,15 @@ public class TestJobQueueClient {
   
   @Test
   public void testGetQueue() throws Exception {
-    checkForConfigFile();
+
+    deleteQueuesConfigFile();
     Document doc = createDocument();
-    createSimpleDocumentWithAcls(doc, "true");
-    writeToFile(doc, CONFIG);
-    Configuration conf = new Configuration();
-    conf.addResource(CONFIG);
-    setUpCluster(conf);
+    createSimpleDocumentWithAcls(doc);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    JobConf jobConf = new JobConf();
+    String namenode = "file:///";
+    miniMRCluster = new MiniMRCluster(0, namenode, 3, null, null, jobConf);
+
     JobClient jc = new JobClient(miniMRCluster.createJobConf());
     // test for existing queue
     QueueInfo queueInfo = jc.getQueueInfo("q1");
@@ -105,7 +107,5 @@ public class TestJobQueueClient {
     // try getting a non-existing queue
     queueInfo = jc.getQueueInfo("queue");
     assertNull(queueInfo);
-
-    new File(CONFIG).delete();
   }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Fri Sep 17 07:34:39 2010
@@ -26,7 +26,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
-import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -202,6 +201,13 @@ public class TestLocalizationWithLinuxTa
     File jobLogDir = TaskLog.getJobDir(jobId);
     checkFilePermissions(jobLogDir.toString(), expectedDirPerms, task.getUser(),
         ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+    // check job-acls.xml file permissions
+    checkFilePermissions(jobLogDir.toString() + Path.SEPARATOR
+        + TaskTracker.jobACLsFile, expectedFilePerms, task.getUser(),
+        ClusterWithLinuxTaskController.taskTrackerSpecialGroup);
+    
+    // validate the content of job ACLs file
+    validateJobACLsFileContent();
   }
 
   @Override

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java Fri Sep 17 07:34:39 2010
@@ -18,8 +18,11 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import javax.security.auth.login.LoginException;
 import junit.framework.TestCase;
+
+import org.apache.hadoop.mapreduce.MRConfig;
+
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -31,58 +34,38 @@ public class TestQueueAclsForCurrentUser
   private QueueManager queueManager;
   private JobConf conf = null;
   UserGroupInformation currentUGI = null;
-  String submitAcl = Queue.QueueOperation.SUBMIT_JOB.getAclName();
-  String adminAcl  = Queue.QueueOperation.ADMINISTER_JOBS.getAclName();
+  String submitAcl = QueueACL.SUBMIT_JOB.getAclName();
+  String adminAcl  = QueueACL.ADMINISTER_JOBS.getAclName();
+
+  @Override
+  protected void tearDown() {
+    deleteQueuesConfigFile();
+  }
 
-  private void setupConfForNoAccess() throws IOException,LoginException {
+  // No access for queues for the user currentUGI
+  private void setupConfForNoAccess() throws Exception {
     currentUGI = UserGroupInformation.getLoginUser();
     String userName = currentUGI.getUserName();
-    conf = new JobConf();
 
-    conf.setBoolean("mapred.acls.enabled",true);
+    String[] queueNames = {"qu1", "qu2"};
+    // Only user u1 has access for queue qu1
+    // Only group g2 has acls for the queue qu2
+    createQueuesConfigFile(
+        queueNames, new String[]{"u1", " g2"}, new String[]{"u1", " g2"});
 
-    conf.set("mapred.queue.names", "qu1,qu2");
-    //Only user u1 has access
-    conf.set("mapred.queue.qu1.acl-submit-job", "u1");
-    conf.set("mapred.queue.qu1.acl-administer-jobs", "u1");
-    //q2 only group g2 has acls for the queues
-    conf.set("mapred.queue.qu2.acl-submit-job", " g2");
-    conf.set("mapred.queue.qu2.acl-administer-jobs", " g2");
-    queueManager = new QueueManager(conf);
+    conf = new JobConf();
+    conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
 
+    queueManager = new QueueManager(conf);
   }
 
   /**
    *  sets up configuration for acls test.
    * @return
    */
-  private void setupConf(boolean aclSwitch) throws IOException,LoginException{
+  private void setupConf(boolean aclSwitch) throws Exception{
     currentUGI = UserGroupInformation.getLoginUser();
     String userName = currentUGI.getUserName();
-    conf = new JobConf();
-
-    conf.setBoolean("mapred.acls.enabled", aclSwitch);
-
-    conf.set("mapred.queue.names", "qu1,qu2,qu3,qu4,qu5,qu6,qu7");
-    //q1 Has acls for all the users, supports both submit and administer
-    conf.set("mapred.queue.qu1.acl-submit-job", "*");
-    conf.set("mapred.queue.qu1-acl-administer-jobs", "*");
-    //q2 only u2 has acls for the queues
-    conf.set("mapred.queue.qu2.acl-submit-job", "u2");
-    conf.set("mapred.queue.qu2.acl-administer-jobs", "u2");
-    //q3  Only u2 has submit operation access rest all have administer access
-    conf.set("mapred.queue.qu3.acl-submit-job", "u2");
-    conf.set("mapred.queue.qu3.acl-administer-jobs", "*");
-    //q4 Only u2 has administer access , anyone can do submit
-    conf.set("mapred.queue.qu4.acl-submit-job", "*");
-    conf.set("mapred.queue.qu4.acl-administer-jobs", "u2");
-    //qu6 only current user has submit access
-    conf.set("mapred.queue.qu6.acl-submit-job",userName);
-    conf.set("mapred.queue.qu6.acl-administrator-jobs","u2");
-    //qu7 only current user has administrator access
-    conf.set("mapred.queue.qu7.acl-submit-job","u2");
-    conf.set("mapred.queue.qu7.acl-administrator-jobs",userName);
-    //qu8 only current group has access
     StringBuilder groupNames = new StringBuilder("");
     String[] ugiGroupNames = currentUGI.getGroupNames();
     int max = ugiGroupNames.length-1;
@@ -92,22 +75,38 @@ public class TestQueueAclsForCurrentUser
         groupNames.append(",");
       }
     }
-    conf.set("mapred.queue.qu5.acl-submit-job"," "+groupNames.toString());
-    conf.set("mapred.queue.qu5.acl-administrator-jobs"," "
-            +groupNames.toString());
+    String groupsAcl = " " + groupNames.toString();
+
+    //q1 Has acls for all the users, supports both submit and administer
+    //q2 only u2 has acls for the queues
+    //q3  Only u2 has submit operation access rest all have administer access
+    //q4 Only u2 has administer access , anyone can do submit
+    //qu5 only current user's groups has access
+    //qu6 only current user has submit access
+    //qu7 only current user has administrator access
+    String[] queueNames =
+        {"qu1", "qu2", "qu3", "qu4", "qu5", "qu6", "qu7"};
+    String[] submitAcls =
+        {"*", "u2", "u2", "*", groupsAcl, userName, "u2"};
+    String[] adminsAcls =
+        {"*", "u2", "*", "u2", groupsAcl, "u2", userName};
+    createQueuesConfigFile(queueNames, submitAcls, adminsAcls);
+
+    conf = new JobConf();
+    conf.setBoolean(MRConfig.MR_ACLS_ENABLED, aclSwitch);
 
     queueManager = new QueueManager(conf);
   }
 
-  public void testQueueAclsForCurrentuser() throws IOException,LoginException {
+  public void testQueueAclsForCurrentuser() throws Exception {
     setupConf(true);
     QueueAclsInfo[] queueAclsInfoList =
             queueManager.getQueueAcls(currentUGI);
     checkQueueAclsInfo(queueAclsInfoList);
   }
 
-  public void testQueueAclsForCurrentUserAclsDisabled() throws IOException,
-          LoginException {
+  // Acls are disabled on the mapreduce cluster
+  public void testQueueAclsForCurrentUserAclsDisabled() throws Exception {
     setupConf(false);
     //fetch the acls info for current user.
     QueueAclsInfo[] queueAclsInfoList = queueManager.
@@ -115,7 +114,7 @@ public class TestQueueAclsForCurrentUser
     checkQueueAclsInfo(queueAclsInfoList);
   }
 
-  public void testQueueAclsForNoAccess() throws IOException,LoginException {
+  public void testQueueAclsForNoAccess() throws Exception {
     setupConfForNoAccess();
     QueueAclsInfo[] queueAclsInfoList = queueManager.
             getQueueAcls(currentUGI);
@@ -124,7 +123,7 @@ public class TestQueueAclsForCurrentUser
 
   private void checkQueueAclsInfo(QueueAclsInfo[] queueAclsInfoList)
           throws IOException {
-    if (conf.get("mapred.acls.enabled").equalsIgnoreCase("true")) {
+    if (conf.get(MRConfig.MR_ACLS_ENABLED).equalsIgnoreCase("true")) {
       for (int i = 0; i < queueAclsInfoList.length; i++) {
         QueueAclsInfo acls = queueAclsInfoList[i];
         String queueName = acls.getQueueName();

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java Fri Sep 17 07:34:39 2010
@@ -22,8 +22,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
 import static org.apache.hadoop.mapred.QueueConfigurationParser.*;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 import static org.junit.Assert.*;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.QueueState;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -31,7 +34,6 @@ import org.junit.After;
 import org.junit.Test;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
-import java.io.File;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -47,26 +49,32 @@ public class TestQueueManager {
 
   @After
   public void tearDown() throws Exception {
-    new File(CONFIG).delete();
+    deleteQueuesConfigFile();
+  }
+
+  // create UGI with the given user name and the fixed group name "myGroup"
+  private UserGroupInformation createUGI(String userName) {
+    return UserGroupInformation.createUserForTesting(
+        userName, new String[]{"myGroup"});
   }
 
   @Test
   public void testDefault() throws Exception {
+    deleteQueuesConfigFile();
     QueueManager qm = new QueueManager();
     Queue root = qm.getRoot();
     assertEquals(root.getChildren().size(), 1);
     assertEquals(root.getChildren().iterator().next().getName(), "default");
-    assertFalse(qm.isAclsEnabled());
     assertNull(root.getChildren().iterator().next().getChildren());
   }
 
   @Test
   public void testXMLParsing() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
     createSimpleDocument(doc);
-    writeToFile(doc, CONFIG);
-    QueueManager qm = new QueueManager(CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
     Set<Queue> rootQueues = qm.getRoot().getChildren();
     List<String> names = new ArrayList<String>();
     for (Queue q : rootQueues) {
@@ -101,62 +109,63 @@ public class TestQueueManager {
 
     assertTrue(
       q.getAcls().get(
-        QueueManager.toFullPropertyName(
+          toFullPropertyName(
           q.getName(), ACL_SUBMIT_JOB_TAG)).isUserAllowed(
-        UserGroupInformation.createRemoteUser("u1")));
+              createUGI("u1")));
 
     assertTrue(
       q.getAcls().get(
-        QueueManager.toFullPropertyName(
+          toFullPropertyName(
           q.getName(),
           ACL_ADMINISTER_JOB_TAG))
-        .isUserAllowed(UserGroupInformation.createRemoteUser("u2")));
+        .isUserAllowed(createUGI("u2")));
     assertTrue(q.getState().equals(QueueState.STOPPED));
   }
 
   @Test
   public void testhasAccess() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
-    createSimpleDocumentWithAcls(doc,"true");
-    writeToFile(doc, CONFIG);
-    QueueManager qm = new QueueManager(CONFIG);
+    createSimpleDocumentWithAcls(doc);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
 
     UserGroupInformation ugi;
     // test for acls access when acls are set with *
-    ugi = UserGroupInformation.createRemoteUser("u1");
+    ugi = createUGI("u1");
     assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p12",
-        Queue.QueueOperation.SUBMIT_JOB, ugi));
-    ugi = UserGroupInformation.createRemoteUser("u2");
+        QueueACL.SUBMIT_JOB, ugi));
+    ugi = createUGI("u2");
     assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p12",
-        Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+        QueueACL.ADMINISTER_JOBS, ugi));
     
     // test for acls access when acls are not set with *
-    ugi = UserGroupInformation.createRemoteUser("u1");
+    ugi = createUGI("u1");
     assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p11",
-        Queue.QueueOperation.SUBMIT_JOB, ugi));
-    ugi = UserGroupInformation.createRemoteUser("u2");
+        QueueACL.SUBMIT_JOB, ugi));
+    ugi = createUGI("u2");
     assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p11",
-        Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+        QueueACL.ADMINISTER_JOBS, ugi));
     
-    // test for acls access when acls are not specified but acls is enabled
-    ugi = UserGroupInformation.createRemoteUser("u1");
-    assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
-        Queue.QueueOperation.SUBMIT_JOB, ugi));
-    ugi = UserGroupInformation.createRemoteUser("u2");
-    assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
-        Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+    // Test for acls access when acls are not specified but acls are enabled.
+    // By default, the queue acls for any queue are empty.
+    ugi = createUGI("u1");
+    assertFalse(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
+        QueueACL.SUBMIT_JOB, ugi));
+    ugi = createUGI("u2");
+    assertFalse(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
+        QueueACL.ADMINISTER_JOBS, ugi));
     
     assertTrue(qm.isRunning("p1" + NAME_SEPARATOR + "p13"));
   }
   
   @Test
   public void testQueueView() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
     createSimpleDocument(doc);
-    writeToFile(doc, CONFIG);
-    QueueManager qm = new QueueManager(CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
     
     for (Queue queue : qm.getRoot().getChildren()) {
       checkHierarchy(queue, qm);
@@ -176,25 +185,21 @@ public class TestQueueManager {
 
   @Test
   public void testhasAccessForParent() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
     createSimpleDocument(doc);
-    writeToFile(doc, CONFIG);
-    QueueManager qm = new QueueManager(CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
 
-    UserGroupInformation ugi =
-      UserGroupInformation.createRemoteUser("u1");
-    assertFalse(
-      qm.hasAccess(
-        "p1",
-        Queue.QueueOperation.SUBMIT_JOB, ugi));
+    UserGroupInformation ugi = createUGI("u1");
+    assertFalse(qm.hasAccess("p1", QueueACL.SUBMIT_JOB, ugi));
   }
 
   @Test
   public void testValidation() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
-    Element queues = createQueuesNode(doc, "false");
+    Element queues = createQueuesNode(doc);
     Element q1 = createQueue(doc, "q1");
 
     q1.appendChild(createAcls(doc, "acl-submit-job", "u1"));
@@ -203,9 +208,9 @@ public class TestQueueManager {
     q1.appendChild(createQueue(doc, "p16"));
 
     queues.appendChild(q1);
-    writeToFile(doc, CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
     try {
-      new QueueManager(CONFIG);
+      new QueueManager(QUEUES_CONFIG_FILE_PATH, false);
       fail("Should throw an exception as configuration is wrong ");
     } catch (RuntimeException re) {
       LOG.info(re.getMessage());
@@ -214,27 +219,27 @@ public class TestQueueManager {
 
   @Test
   public void testInvalidName() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
-    Element queues = createQueuesNode(doc, "false");
+    Element queues = createQueuesNode(doc);
     Element q1 = createQueue(doc, "");
     queues.appendChild(q1);
-    writeToFile(doc, CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
     try {
-      new QueueManager(CONFIG);
+      new QueueManager(QUEUES_CONFIG_FILE_PATH, false);
       fail("Should throw an exception as configuration is wrong ");
     } catch (Exception re) {
       re.printStackTrace();
       LOG.info(re.getMessage());
     }
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     doc = createDocument();
-    queues = createQueuesNode(doc, "false");
+    queues = createQueuesNode(doc);
     q1 = doc.createElement("queue");
     queues.appendChild(q1);
-    writeToFile(doc, CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
     try {
-      new QueueManager(CONFIG);
+      new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
       fail("Should throw an exception as configuration is wrong ");
     } catch (RuntimeException re) {
       re.printStackTrace();
@@ -244,10 +249,10 @@ public class TestQueueManager {
 
   @Test
   public void testMissingConfigFile() throws Exception {
-    checkForConfigFile(); // deletes file
+    deleteQueuesConfigFile(); // deletes file
 
     try {
-      new QueueManager(CONFIG);
+      new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
       fail("Should throw an exception for missing file when " +
            "explicitly passed.");
     } catch (RuntimeException re) {
@@ -266,9 +271,9 @@ public class TestQueueManager {
 
   @Test
   public void testEmptyProperties() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
-    Element queues = createQueuesNode(doc, "false");
+    Element queues = createQueuesNode(doc);
     Element q1 = createQueue(doc, "q1");
     Element p = createProperties(doc, null);
     q1.appendChild(p);
@@ -277,11 +282,11 @@ public class TestQueueManager {
 
   @Test
   public void testEmptyFile() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
-    writeToFile(doc, CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
     try {
-      new QueueManager(CONFIG);
+      new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
       fail("Should throw an exception as configuration is wrong ");
     } catch (Exception re) {
       re.printStackTrace();
@@ -291,11 +296,11 @@ public class TestQueueManager {
 
   @Test
   public void testJobQueueInfoGeneration() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
     createSimpleDocument(doc);
-    writeToFile(doc, CONFIG);
-    QueueManager qm = new QueueManager(CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
 
     List<JobQueueInfo> rootQueues =
       qm.getRoot().getJobQueueInfo().getChildren();
@@ -338,11 +343,11 @@ public class TestQueueManager {
    */
   @Test
   public void testRefresh() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
     createSimpleDocument(doc);
-    writeToFile(doc, CONFIG);
-    QueueManager qm = new QueueManager(CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, true);
     Queue beforeRefreshRoot = qm.getRoot();
     //remove the file and create new one.
     Set<Queue> rootQueues = beforeRefreshRoot.getChildren();
@@ -360,16 +365,16 @@ public class TestQueueManager {
             "p1" + NAME_SEPARATOR + "p12")) {
             assertTrue(
               child.getAcls().get(
-                QueueManager.toFullPropertyName(
+                  toFullPropertyName(
                   child.getName(), ACL_SUBMIT_JOB_TAG))
-                .isUserAllowed(UserGroupInformation.createRemoteUser("u1")));
+                .isUserAllowed(createUGI("u1")));
 
             assertTrue(
               child.getAcls().get(
-                QueueManager.toFullPropertyName(
+                  toFullPropertyName(
                   child.getName(),
                   ACL_ADMINISTER_JOB_TAG))
-                .isUserAllowed(UserGroupInformation.createRemoteUser("u2")));
+                .isUserAllowed(createUGI("u2")));
             assertTrue(child.getState().equals(QueueState.STOPPED));
           } else {
             assertTrue(child.getState().equals(QueueState.RUNNING));
@@ -377,11 +382,11 @@ public class TestQueueManager {
         }
       }
     }
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     doc = createDocument();
     refreshSimpleDocument(doc);
-    writeToFile(doc, CONFIG);
-    QueueConfigurationParser cp = new QueueConfigurationParser(CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    QueueConfigurationParser cp = new QueueConfigurationParser(QUEUES_CONFIG_FILE_PATH, true);
     qm.getRoot().isHierarchySameAs(cp.getRoot());
     qm.setQueues(
       cp.getRoot().getChildren().toArray(
@@ -403,17 +408,17 @@ public class TestQueueManager {
             "p1" + NAME_SEPARATOR + "p12")) {
             assertTrue(
               child.getAcls().get(
-                QueueManager.toFullPropertyName(
+                  toFullPropertyName(
                   child.getName(),
                   ACL_SUBMIT_JOB_TAG))
-                .isUserAllowed(UserGroupInformation.createRemoteUser("u3")));
+                .isUserAllowed(createUGI("u3")));
 
             assertTrue(
               child.getAcls().get(
-                QueueManager.toFullPropertyName(
+                  toFullPropertyName(
                   child.getName(),
                   ACL_ADMINISTER_JOB_TAG))
-                .isUserAllowed(UserGroupInformation.createRemoteUser("u4")));
+                .isUserAllowed(createUGI("u4")));
             assertTrue(child.getState().equals(QueueState.RUNNING));
           } else {
             assertTrue(child.getState().equals(QueueState.STOPPED));
@@ -425,20 +430,20 @@ public class TestQueueManager {
 
   @Test
   public void testRefreshWithInvalidFile() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
     createSimpleDocument(doc);
-    writeToFile(doc, CONFIG);
-    QueueManager qm = new QueueManager(CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+    QueueManager qm = new QueueManager(QUEUES_CONFIG_FILE_PATH, false);
 
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     doc = createDocument();
-    Element queues = createQueuesNode(doc, "false");
+    Element queues = createQueuesNode(doc);
     Element q1 = createQueue(doc, "");
     queues.appendChild(q1);
-    writeToFile(doc, CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
     try {
-      QueueConfigurationParser cp = new QueueConfigurationParser(CONFIG);
+      QueueConfigurationParser cp = new QueueConfigurationParser(QUEUES_CONFIG_FILE_PATH, false);
 
       fail("Should throw an exception as configuration is wrong ");
     } catch (Throwable re) {
@@ -548,19 +553,21 @@ public class TestQueueManager {
    */
   @Test
   public void testDumpConfiguration() throws Exception {
-    checkForConfigFile();
+    deleteQueuesConfigFile();
     Document doc = createDocument();
     createSimpleDocument(doc);    
-    writeToFile(doc, CONFIG);
+    writeToFile(doc, QUEUES_CONFIG_FILE_PATH);
+
     StringWriter out = new StringWriter();
-    QueueManager.dumpConfiguration(out,CONFIG,null);
+    Configuration conf = new Configuration(false);
+    conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+    QueueManager.dumpConfiguration(out, QUEUES_CONFIG_FILE_PATH, conf);
+
     ObjectMapper mapper = new ObjectMapper();
     // parse the Json dump
     JsonQueueTree queueTree =
       mapper.readValue(out.toString(), JsonQueueTree.class);
-    
-    // check if acls_enabled is correct
-    assertEquals(true, queueTree.isAcls_enabled());
+
     // check for the number of top-level queues
     assertEquals(2, queueTree.getQueues().length);