You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2010/09/17 09:34:40 UTC

svn commit: r998003 [1/3] - in /hadoop/mapreduce/trunk: ./ conf/ src/c++/task-controller/ src/c++/task-controller/tests/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/mumak/src/test/org/apache/hadoop/mapred/ src/docs/src...

Author: vinodkv
Date: Fri Sep 17 07:34:39 2010
New Revision: 998003

URL: http://svn.apache.org/viewvc?rev=998003&view=rev
Log:
MAPREDUCE-1664. Job Acls affect Queue Acls.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ACLsManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Operation.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueACL.java
Removed:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerJobACLsManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerJobACLsManager.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/conf/mapred-queues.xml.template
    hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
    hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h
    hadoop/mapreduce/trunk/src/c++/task-controller/tests/test-task-controller.c
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
    hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
    hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
    hadoop/mapreduce/trunk/src/java/mapred-default.xml
    hadoop/mapreduce/trunk/src/java/mapred-queues-default.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DeprecatedQueueConfigurationParser.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JSPUtil.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Queue.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueInfo.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerRefresh.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserLogCleanup.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CurrentJHParser.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Sep 17 07:34:39 2010
@@ -7,6 +7,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1866. Removes deprecated class
     org.apache.hadoop.streaming.UTF8ByteArrayUtils. (amareshwari)
 
+    MAPREDUCE-1664. Changes the behaviour of the combination of job-acls
+    when they function together with queue-acls. (Ravi Gummadi via vinodkv)
+
   NEW FEATURES
 
     MAPREDUCE-1804. Stress-test tool for HDFS introduced in HDFS-708.

Modified: hadoop/mapreduce/trunk/conf/mapred-queues.xml.template
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/conf/mapred-queues.xml.template?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/conf/mapred-queues.xml.template (original)
+++ hadoop/mapreduce/trunk/conf/mapred-queues.xml.template Fri Sep 17 07:34:39 2010
@@ -18,10 +18,10 @@
 <!-- This is the template for queue configuration. The format supports nesting of
      queues within queues - a feature called hierarchical queues. All queues are
      defined within the 'queues' tag which is the top level element for this
-     XML document.
-     The 'aclsEnabled' attribute should be set to true, if ACLs should be checked
-     on queue operations such as submitting jobs, killing jobs etc. -->
-<queues aclsEnabled="false">
+     XML document. The queue acls configured here for different queues are
+     checked for authorization only if the configuration property
+     mapreduce.cluster.acls.enabled is set to true. -->
+<queues>
 
   <!-- Configuration for a queue is specified by defining a 'queue' element. -->
   <queue>
@@ -40,17 +40,37 @@
 
     <!-- Specifies the ACLs to check for submitting jobs to this queue.
          If set to '*', it allows all users to submit jobs to the queue.
+         If set to ' '(i.e. space), no user will be allowed to do this
+         operation. The default value for any queue acl is ' '.
          For specifying a list of users and groups the format to use is
-         user1,user2 group1,group2 -->
-    <acl-submit-job>*</acl-submit-job>
+         user1,user2 group1,group2
 
-    <!-- Specifies the ACLs to check for modifying jobs in this queue.
-         Modifications include killing jobs, tasks of jobs or changing
+         It is only used if authorization is enabled in Map/Reduce by setting
+         the configuration property mapreduce.cluster.acls.enabled to true.
+
+         Irrespective of this ACL configuration, the user who started the
+         cluster and cluster administrators configured via
+         mapreduce.cluster.administrators can do this operation. -->
+    <acl-submit-job> </acl-submit-job>
+
+    <!-- Specifies the ACLs to check for viewing and modifying jobs in this
+         queue. Modifications include killing jobs, tasks of jobs or changing
          priorities.
-         If set to '*', it allows all users to submit jobs to the queue.
+         If set to '*', it allows all users to view, modify jobs of the queue.
+         If set to ' '(i.e. space), no user will be allowed to do this
+         operation.
          For specifying a list of users and groups the format to use is
-         user1,user2 group1,group2 -->
-    <acl-administer-jobs>*</acl-administer-jobs>
+         user1,user2 group1,group2
+
+         It is only used if authorization is enabled in Map/Reduce by setting
+         the configuration property mapreduce.cluster.acls.enabled to true.
+
+         Irrespective of this ACL configuration, the user who started the
+         cluster  and cluster administrators configured via
+         mapreduce.cluster.administrators can do the above operations on all
+         the jobs in all the queues. The job owner can do all the above
+         operations on his/her job irrespective of this ACL configuration. -->
+    <acl-administer-jobs> </acl-administer-jobs>
   </queue>
 
   <!-- Here is a sample of a hierarchical queue configuration

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.c?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.c Fri Sep 17 07:34:39 2010
@@ -227,6 +227,14 @@ char *get_job_log_dir(const char *log_di
 }
 
 /**
+ * Get the job ACLs file for the given job log dir.
+ */
+char *get_job_acls_file(const char *log_dir) {
+  return concatenate(JOB_LOG_DIR_TO_JOB_ACLS_FILE_PATTERN, "job_acls_file",
+                     1, log_dir);
+}
+
+/**
  * Function to check if the passed tt_root is present in mapreduce.cluster.local.dir
  * the task-controller is configured with.
  */
@@ -517,12 +525,20 @@ int prepare_attempt_directories(const ch
 }
 
 /**
- * Function to prepare the job log dir for the child. It gives the user
- * ownership of the job's log-dir to the user and group ownership to the
- * user running tasktracker.
- *     *  sudo chown user:mapred log-dir/userlogs/$jobid
- *     *  sudo chmod -R 2770 log-dir/userlogs/$jobid // user is same as tt_user
- *     *  sudo chmod -R 2570 log-dir/userlogs/$jobid // user is not tt_user
+ * Function to prepare the job log dir(and job acls file in it) for the child.
+ * It gives the user ownership of the job's log-dir to the user and
+ * group ownership to the user running tasktracker(i.e. tt_user).
+ *
+ *   *  sudo chown user:mapred log-dir/userlogs/$jobid
+ *   *    if user is not $tt_user,
+ *   *      sudo chmod 2570 log-dir/userlogs/$jobid
+ *   *    else
+ *   *      sudo chmod 2770 log-dir/userlogs/$jobid
+ *   *  sudo chown user:mapred log-dir/userlogs/$jobid/job-acls.xml
+ *   *    if user is not $tt_user,
+ *   *      sudo chmod 2570 log-dir/userlogs/$jobid/job-acls.xml
+ *   *    else
+ *   *      sudo chmod 2770 log-dir/userlogs/$jobid/job-acls.xml 
  */
 int prepare_job_logs(const char *log_dir, const char *job_id,
     mode_t permissions) {
@@ -559,6 +575,42 @@ int prepare_job_logs(const char *log_dir
     free(job_log_dir);
     return -1;
   }
+
+  //set ownership and permissions for job_log_dir/job-acls.xml, if exists.
+  char *job_acls_file = get_job_acls_file(job_log_dir);
+  if (job_acls_file == NULL) {
+    fprintf(LOGFILE, "Couldn't get job acls file %s.\n", job_acls_file);
+    free(job_log_dir);
+    return -1; 
+  }
+
+  struct stat filestat1;
+  if (stat(job_acls_file, &filestat1) != 0) {
+    if (errno == ENOENT) {
+#ifdef DEBUG
+      fprintf(LOGFILE, "job_acls_file %s doesn't exist. Not doing anything.\n",
+          job_acls_file);
+#endif
+      free(job_acls_file);
+      free(job_log_dir);
+      return 0;
+    } else {
+      // stat failed because of something else!
+      fprintf(LOGFILE, "Failed to stat the job_acls_file %s\n", job_acls_file);
+      free(job_acls_file);
+      free(job_log_dir);
+      return -1;
+    }
+  }
+
+  if (secure_single_path(job_acls_file, user_detail->pw_uid, tasktracker_gid,
+      permissions, 1) != 0) {
+    fprintf(LOGFILE, "Failed to secure the job acls file %s\n", job_acls_file);
+    free(job_acls_file);
+    free(job_log_dir);
+    return -1;
+  }
+  free(job_acls_file);
   free(job_log_dir);
   return 0;
 }

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/task-controller.h?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/task-controller.h Fri Sep 17 07:34:39 2010
@@ -90,6 +90,8 @@ enum errorcodes {
 
 #define JOB_LOG_DIR_PATTERN "%s/userlogs/%s"
 
+#define JOB_LOG_DIR_TO_JOB_ACLS_FILE_PATTERN "%s/job-acls.xml"
+
 #define ATTEMPT_LOG_DIR_PATTERN JOB_LOG_DIR_PATTERN"/%s"
 
 #define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh"

Modified: hadoop/mapreduce/trunk/src/c++/task-controller/tests/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/c%2B%2B/task-controller/tests/test-task-controller.c?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/c++/task-controller/tests/test-task-controller.c (original)
+++ hadoop/mapreduce/trunk/src/c++/task-controller/tests/test-task-controller.c Fri Sep 17 07:34:39 2010
@@ -183,6 +183,19 @@ void test_get_job_log_dir() {
   assert(ret == 0);
 }
 
+void test_get_job_acls_file() {
+  char *job_acls_file = (char *) get_job_acls_file(
+    "/tmp/testing/userlogs/job_200906101234_0001");
+  printf("job acls file obtained is %s\n", job_acls_file);
+  int ret = 0;
+  if (strcmp(job_acls_file,
+    "/tmp/testing/userlogs/job_200906101234_0001/job-acls.xml") != 0) {
+    ret = -1;
+  }
+  free(job_acls_file);
+  assert(ret == 0);
+}
+
 void test_get_task_log_dir() {
   char *logdir = (char *) get_task_log_dir("/tmp/testing",
     "job_200906101234_0001", "attempt_200906112028_0001_m_000000_0");
@@ -219,6 +232,9 @@ int main(int argc, char **argv) {
   printf("\nTesting get_job_log_dir()\n");
   test_get_job_log_dir();
 
+  printf("\nTesting get_job_acls_file()\n");
+  test_get_job_acls_file();
+
   printf("\nTesting get_task_log_dir()\n");
   test_get_task_log_dir();
 

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java Fri Sep 17 07:34:39 2010
@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.QueueState;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
@@ -584,10 +585,8 @@ public class CapacityTestUtils {
       for (String queueName : queueNames) {
         HashMap<String, AccessControlList> aclsMap
           = new HashMap<String, AccessControlList>();
-        for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
-          String key = QueueManager.toFullPropertyName(
-            queueName,
-            oper.getAclName());
+        for (QueueACL qAcl : QueueACL.values()) {
+          String key = toFullPropertyName(queueName, qAcl.getAclName());
           aclsMap.put(key, allEnabledAcl);
         }
         queues[i++] = new Queue(queueName, aclsMap, QueueState.RUNNING);

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java Fri Sep 17 07:34:39 2010
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.JobSt
 import org.apache.hadoop.mapreduce.QueueAclsInfo;
 import org.apache.hadoop.mapreduce.QueueInfo;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskReport;
@@ -410,6 +411,11 @@ public class MockSimulatorJobTracker imp
   }
 
   @Override
+  public AccessControlList getQueueAdmins(String queueName) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException,
       InterruptedException {
     throw new UnsupportedOperationException();

Modified: hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original)
+++ hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Fri Sep 17 07:34:39 2010
@@ -1762,12 +1762,14 @@
         
         <section>
           <title>Job Authorization</title>
-          <p>Job level authorization is enabled on the cluster, if the configuration
-          <code>mapreduce.cluster.job-authorization-enabled</code> is set to
-          true. When enabled, access control checks are done by the JobTracker
-          and the TaskTracker before allowing users to view
-          job details or to modify a job using Map/Reduce APIs,
-          CLI or web user interfaces.</p>
+          <p>Job level authorization and queue level authorization are enabled
+          on the cluster, if the configuration
+          <code>mapreduce.cluster.acls.enabled</code> is set to
+          true. When enabled, access control checks are done by (a) the
+          JobTracker before allowing users to submit jobs to queues and
+          administering these jobs and (b) by the JobTracker and the TaskTracker
+          before allowing users to view job details or to modify a job using
+          MapReduce APIs, CLI or web user interfaces.</p>
          
           <p>A job submitter can specify access control lists for viewing or
           modifying a job via the configuration properties
@@ -1775,11 +1777,13 @@
           <code>mapreduce.job.acl-modify-job</code> respectively. By default, 
           nobody is given access in these properties.</p> 
           
-          <p>However, irrespective of the ACLs configured, a job's owner,
-          the superuser and the members of an admin configured supergroup
-          (<code>mapreduce.cluster.permissions.supergroup</code>) always
-          have access to view and modify a job.</p>
-          
+          <p>However, irrespective of the job ACLs configured, a job's owner,
+          the user who started the cluster and members of an admin configured
+          supergroup (<code>mapreduce.cluster.permissions.supergroup</code>)
+          and queue administrators of the queue to which the job was submitted
+          to (<code>acl-administer-jobs</code>) always have access to view and
+          modify a job.</p>
+
           <p> A job view ACL authorizes users against the configured 
           <code>mapreduce.job.acl-view-job</code> before returning possibly 
           sensitive information about a job, like: </p>
@@ -1801,10 +1805,13 @@
             <li> killing/failing a task of a job </li>
             <li> setting the priority of a job </li>
           </ul>
-          <p>These operations are also protected by the queue level ACL,
-          "acl-administer-jobs", configured via mapred-queue-acls.xml. The caller
-          will be authorized against both queue level ACLs and job level ACLs,
-          depending on what is enabled.</p>
+          <p>These view and modify operations on jobs are also permitted by
+          the queue level ACL, "acl-administer-jobs", configured via
+          mapred-queue-acls.xml. The caller will be able to do the operation
+          if he/she is part of either queue admins ACL or job modification ACL
+          or the user who started the cluster or a member of an admin configured
+          supergroup (<code>mapreduce.cluster.permissions.supergroup</code>).
+          </p>
           
           <p>The format of a job level ACL is the same as the format for a
           queue level ACL as defined in the

Modified: hadoop/mapreduce/trunk/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/mapred-default.xml?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/trunk/src/java/mapred-default.xml Fri Sep 17 07:34:39 2010
@@ -923,7 +923,7 @@
   <name>mapreduce.job.queuename</name>
   <value>default</value>
   <description> Queue to which a job is submitted. This must match one of the
-    queues defined in mapred.queue.names for the system. Also, the ACL setup
+    queues defined in mapred-queues.xml for the system. Also, the ACL setup
     for the queue must allow the current user to submit a job to the queue.
     Before specifying a queue, ensure that the system is configured with 
     the queue, and access is allowed for submitting jobs to the queue.
@@ -931,69 +931,85 @@
 </property>
 
 <property>
-  <name>mapreduce.cluster.job-authorization-enabled</name>
+  <name>mapreduce.cluster.acls.enabled</name>
   <value>false</value>
-  <description> Boolean flag that specifies if job-level authorization checks
-  should be enabled on the jobs submitted to the cluster.  Job-level
-  authorization is enabled if this flag is set to true or disabled otherwise.
-  It is disabled by default. If enabled, access control checks are made by
-  JobTracker and TaskTracker when requests are made by users for viewing the
-  job-details (See mapreduce.job.acl-view-job) or for modifying the job
-  (See mapreduce.job.acl-modify-job) using Map/Reduce APIs, RPCs or via the
-  console and web user interfaces.
+  <description> Specifies whether ACLs should be checked
+    for authorization of users for doing various queue and job level operations.
+    ACLs are disabled by default. If enabled, access control checks are made by
+    JobTracker and TaskTracker when requests are made by users for queue
+    operations like submit job to a queue and kill a job in the queue and job
+    operations like viewing the job-details (See mapreduce.job.acl-view-job)
+    or for modifying the job (See mapreduce.job.acl-modify-job) using
+    Map/Reduce APIs, RPCs or via the console and web user interfaces.
+    For enabling this flag(mapreduce.cluster.acls.enabled), this is to be set
+    to true in mapred-site.xml on JobTracker node and on all TaskTracker nodes.
   </description>
 </property>
 
 <property>
   <name>mapreduce.job.acl-modify-job</name>
-  <value></value>
+  <value> </value>
   <description> Job specific access-control list for 'modifying' the job. It
     is only used if authorization is enabled in Map/Reduce by setting the
-    configuration property mapreduce.cluster.job-authorization-enabled to true.
+    configuration property mapreduce.cluster.acls.enabled to true.
     This specifies the list of users and/or groups who can do modification
     operations on the job. For specifying a list of users and groups the
     format to use is "user1,user2 group1,group". If set to '*', it allows all
-    users/groups to modify this job. If set to '', it allows none. This
-    configuration is used to guard all the modifications with respect to this
-    job and takes care of all the following operations:
+    users/groups to modify this job. If set to ' '(i.e. space), it allows
+    none. This configuration is used to guard all the modifications with respect
+    to this job and takes care of all the following operations:
       o killing this job
       o killing a task of this job, failing a task of this job
       o setting the priority of this job
     Each of these operations are also protected by the per-queue level ACL
     "acl-administer-jobs" configured via mapred-queues.xml. So a caller should
-    have the authorization to satisfy both the queue-level ACL and the
+    have the authorization to satisfy either the queue-level ACL or the
     job-level ACL.
 
-    Irrespective of this ACL configuration, job-owner, superuser and members
-    of supergroup configured on JobTracker via 
-    "mapreduce.cluster.permissions.supergroup",
-    can do all the modification operations.
-
-    By default, nobody else besides job-owner, superuser/supergroup can
-    perform modification operations on a job that they don't own.
+    Irrespective of this ACL configuration, (a) job-owner, (b) the user who
+    started the cluster, (c) members of an admin configured supergroup
+    configured via mapreduce.cluster.permissions.supergroup and (d) queue
+    administrators of the queue to which this job was submitted to configured
+    via acl-administer-jobs for the specific queue in mapred-queues.xml can
+    do all the modification operations on a job.
+
+    By default, nobody else besides job-owner, the user who started the cluster,
+    members of supergroup and queue administrators can perform modification
+    operations on a job.
   </description>
 </property>
 
 <property>
   <name>mapreduce.job.acl-view-job</name>
-  <value></value>
+  <value> </value>
   <description> Job specific access-control list for 'viewing' the job. It is
     only used if authorization is enabled in Map/Reduce by setting the
-    configuration property mapreduce.cluster.job-authorization-enabled to true.
+    configuration property mapreduce.cluster.acls.enabled to true.
     This specifies the list of users and/or groups who can view private details
     about the job. For specifying a list of users and groups the
     format to use is "user1,user2 group1,group". If set to '*', it allows all
-    users/groups to modify this job. If set to '', it allows none. This
-    configuration is used to guard some of the job-views and at present only
-    protects APIs that can return possibly sensitive information of the
-    job-owner like
+    users/groups to modify this job. If set to ' '(i.e. space), it allows
+    none. This configuration is used to guard some of the job-views and at
+    present only protects APIs that can return possibly sensitive information
+    of the job-owner like
       o job-level counters
       o task-level counters
       o tasks' diagnostic information
       o task-logs displayed on the TaskTracker web-UI and
       o job.xml showed by the JobTracker's web-UI
-    Every other piece information of jobs is still accessible by any other
-    users, for e.g., JobStatus, JobProfile, list of jobs in the queue, etc.
+    Every other piece of information of jobs is still accessible by any other
+    user, for e.g., JobStatus, JobProfile, list of jobs in the queue, etc.
+
+    Irrespective of this ACL configuration, (a) job-owner, (b) the user who
+    started the cluster, (c) members of an admin configured supergroup
+    configured via mapreduce.cluster.permissions.supergroup and (d) queue
+    administrators of the queue to which this job was submitted to configured
+    via acl-administer-jobs for the specific queue in mapred-queues.xml can
+    do all the view operations on a job.
+
+    By default, nobody else besides job-owner, the user who started the
+    cluster, memebers of supergroup and queue administrators can perform
+    view operations on a job.
   </description>
 </property>
 

Modified: hadoop/mapreduce/trunk/src/java/mapred-queues-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/mapred-queues-default.xml?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/mapred-queues-default.xml (original)
+++ hadoop/mapreduce/trunk/src/java/mapred-queues-default.xml Fri Sep 17 07:34:39 2010
@@ -17,13 +17,13 @@
 -->
 <!-- This is the default mapred-queues.xml file that is loaded in the case
      that the user does not have such a file on their classpath. -->
-<queues aclsEnabled="false">
+<queues>
   <queue>
     <name>default</name>
     <properties>
     </properties>
     <state>running</state>
-    <acl-submit-job>*</acl-submit-job>
-    <acl-administer-jobs>*</acl-administer-jobs>
+    <acl-submit-job> </acl-submit-job>
+    <acl-administer-jobs> </acl-administer-jobs>
   </queue>
 </queues>
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ACLsManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ACLsManager.java?rev=998003&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ACLsManager.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ACLsManager.java Fri Sep 17 07:34:39 2010
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.AuditLogger.Constants;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
+/**
+ * Manages MapReduce cluster administrators and access checks for
+ * job level operations and queue level operations.
+ * Uses JobACLsManager for access checks of job level operations and
+ * QueueManager for queue operations.
+ */
+@InterfaceAudience.Private
+class ACLsManager {
+
+  // MROwner(user who started this mapreduce cluster)'s ugi
+  private final UserGroupInformation mrOwner;
+  // members of supergroup are mapreduce cluster administrators
+  private final String superGroup;
+  
+  private final JobACLsManager jobACLsManager;
+  private final QueueManager queueManager;
+  
+  private final boolean aclsEnabled;
+
+  ACLsManager(Configuration conf, JobACLsManager jobACLsManager,
+      QueueManager queueManager) throws IOException {
+
+    mrOwner = UserGroupInformation.getCurrentUser();
+    superGroup = conf.get(MRConfig.MR_SUPERGROUP, "supergroup");
+    
+    aclsEnabled = conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
+
+    this.jobACLsManager = jobACLsManager;
+    this.queueManager = queueManager;
+  }
+
+  UserGroupInformation getMROwner() {
+    return mrOwner;
+  }
+
+  String getSuperGroup() {
+    return superGroup;
+  }
+
+  JobACLsManager getJobACLsManager() {
+    return jobACLsManager;
+  }
+
+  /**
+   * Is the calling user an admin for the mapreduce cluster ?
+   * i.e. either cluster owner or member of the supergroup
+   *      mapreduce.cluster.permissions.supergroup.
+   * @return true, if user is an admin
+   */
+  boolean isMRAdmin(UserGroupInformation callerUGI) {
+    if (mrOwner.getShortUserName().equals(callerUGI.getShortUserName())) {
+      return true;
+    }
+    String[] groups = callerUGI.getGroupNames();
+    for(int i=0; i < groups.length; ++i) {
+      if (groups[i].equals(superGroup)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Check the ACLs for a user doing the passed operation.
+   * <ul>
+   * <li>If ACLs are disabled, allow all users.</li>
+   * <li>Otherwise, if the operation is not a job operation(for eg.
+   *  submit-job-to-queue), then allow only (a) clusterOwner(who started the
+   *  cluster), (b) cluster administrators and (c) members of
+   *  queue-submit-job-acl for the queue.</li>
+   * <li>If the operation is a job operation, then allow only (a) jobOwner,
+   * (b) clusterOwner(who started the cluster), (c) cluster administrators,
+   * (d) members of queue admins acl for the queue and (e) members of job
+   * acl for the job operation</li>
+   * </ul>
+   * 
+   * @param job   the job on which operation is requested
+   * @param callerUGI  the user who is requesting the operation
+   * @param operation  the operation for which authorization is needed
+   * @throws AccessControlException
+   */
+   void checkAccess(JobInProgress job, UserGroupInformation callerUGI,
+       Operation operation) throws AccessControlException {
+
+    String queue = job.getProfile().getQueueName();
+    String jobId = job.getJobID().toString();
+    JobStatus jobStatus = job.getStatus();
+    String jobOwner = jobStatus.getUsername();
+    AccessControlList jobAcl =
+        jobStatus.getJobACLs().get(operation.jobACLNeeded);
+
+    checkAccess(jobId, callerUGI, queue, operation, jobOwner, jobAcl);
+  }
+
+  /**
+   * Check the ACLs for a user doing the passed job operation.
+   * <ul>
+   * <li>If ACLs are disabled, allow all users.</li>
+   * <li>Otherwise, allow only (a) jobOwner,
+   * (b) clusterOwner(who started the cluster), (c) cluster administrators,
+   * (d) members of job acl for the jobOperation</li>
+   * </ul>
+   * 
+   * @param jobStatus  the status of the job
+   * @param callerUGI  the user who is trying to perform the operation
+   * @param queue      the job queue name
+   * @param operation  the operation for which authorization is needed
+   */
+  void checkAccess(JobStatus jobStatus, UserGroupInformation callerUGI,
+      String queue, Operation operation) throws AccessControlException {
+
+    String jobId = jobStatus.getJobID().toString();
+    String jobOwner = jobStatus.getUsername();
+    AccessControlList jobAcl =
+      jobStatus.getJobACLs().get(operation.jobACLNeeded);
+
+    // If acls are enabled, check if callerUGI is jobOwner, queue admin,
+    // cluster admin or part of job ACL
+    checkAccess(jobId, callerUGI, queue, operation, jobOwner, jobAcl);
+  }
+
+  /**
+   * Check the ACLs for a user doing the passed operation.
+   * <ul>
+   * <li>If ACLs are disabled, allow all users.</li>
+   * <li>Otherwise, if the operation is not a job operation(for eg.
+   *  submit-job-to-queue), then allow only (a) clusterOwner(who started the
+   *  cluster), (b) cluster administrators and (c) members of
+   *  queue-submit-job-acl for the queue.</li>
+   * <li>If the operation is a job operation, then allow only (a) jobOwner,
+   * (b) clusterOwner(who started the cluster), (c) cluster administrators,
+   * (d) members of queue admins acl for the queue and (e) members of job
+   * acl for the job operation</li>
+   * </ul>
+   * 
+   * @param jobId      the job id
+   * @param callerUGI  the user who is trying to perform the operation
+   * @param queue      the job queue name
+   * @param operation  the operation for which authorization is needed
+   * @param jobOwner   the user who submitted(or is submitting) this job
+   * @param jobAcl     could be job-view-acl or job-modify-acl depending on the
+   *                   job operation.
+   */
+  void checkAccess(String jobId, UserGroupInformation callerUGI,
+      String queue, Operation operation, String jobOwner,
+      AccessControlList jobAcl) throws AccessControlException {
+
+    String user = callerUGI.getShortUserName();
+    String targetResource = jobId + " in queue " + queue;
+
+    if (!aclsEnabled) {
+      AuditLogger.logSuccess(user, operation.name(), targetResource);
+      return;
+    }
+
+    // Allow mapreduce cluster admins to do any queue operation and
+    // any job operation
+    if (isMRAdmin(callerUGI)) {
+      AuditLogger.logSuccess(user, operation.name(), targetResource);
+      return;
+    }
+
+    if (operation == Operation.SUBMIT_JOB) {
+      // This is strictly queue operation(not a job operation)
+      if (!queueManager.hasAccess(queue, operation.qACLNeeded, callerUGI)) {
+        AuditLogger.logFailure(user, operation.name(),
+            queueManager.getQueueACL(queue, operation.qACLNeeded).toString(),
+            targetResource, Constants.UNAUTHORIZED_USER);
+
+        throw new AccessControlException("User "
+            + callerUGI.getShortUserName() + " cannot perform "
+            + "operation " + operation.name() + " on queue " + queue
+            + ".\n Please run \"hadoop queue -showacls\" "
+            + "command to find the queues you have access to .");
+      } else {
+        AuditLogger.logSuccess(user, operation.name(), targetResource);
+        return;
+      }
+    }
+
+    // Check if callerUGI is queueAdmin(in some cases only), jobOwner or
+    // part of job-acl.
+
+    // queueManager and queue are null only when called from
+    // TaskTracker(i.e. from TaskLogServlet) for the operation VIEW_TASK_LOGS.
+    // Caller of this method takes care of checking if callerUGI is a
+    // queue administrator for that operation.
+    if (operation == Operation.VIEW_TASK_LOGS) {
+      if (jobACLsManager.checkAccess(callerUGI, operation.jobACLNeeded,
+          jobOwner, jobAcl)) {
+        AuditLogger.logSuccess(user, operation.name(), targetResource);
+        return;
+      }
+    } else if (queueManager.hasAccess(queue, operation.qACLNeeded, callerUGI) ||
+        jobACLsManager.checkAccess(callerUGI, operation.jobACLNeeded,
+            jobOwner, jobAcl)) {
+      AuditLogger.logSuccess(user, operation.name(), targetResource);
+      return;
+    }
+
+    AuditLogger.logFailure(user, operation.name(), jobAcl.toString(),
+        targetResource, Constants.UNAUTHORIZED_USER);
+
+    throw new AccessControlException("User "
+        + callerUGI.getShortUserName() + " cannot perform operation "
+        + operation.name() + " on " + jobId + " that is in the queue "
+        + queue);
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java Fri Sep 17 07:34:39 2010
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.AccessControlException;
@@ -52,7 +51,7 @@ class CompletedJobStatusStore implements
   private FileSystem fs;
   private static final String JOB_INFO_STORE_DIR = "/jobtracker/jobsInfo";
 
-  private JobACLsManager jobACLsManager = null;
+  private ACLsManager aclsManager;
 
   public static final Log LOG =
           LogFactory.getLog(CompletedJobStatusStore.class);
@@ -62,7 +61,8 @@ class CompletedJobStatusStore implements
   final static FsPermission JOB_STATUS_STORE_DIR_PERMISSION = FsPermission
       .createImmutable((short) 0750); // rwxr-x--
 
-  CompletedJobStatusStore(JobACLsManager aclsManager, Configuration conf)
+
+  CompletedJobStatusStore(Configuration conf, ACLsManager aclsManager)
       throws IOException {
     active =
       conf.getBoolean(JTConfig.JT_PERSIST_JOBSTATUS, true);
@@ -104,7 +104,7 @@ class CompletedJobStatusStore implements
         deleteJobStatusDirs();
       }
 
-      this.jobACLsManager = aclsManager;
+      this.aclsManager = aclsManager;
 
       LOG.info("Completed job store activated/configured with retain-time : " 
                + retainTime + " , job-info-dir : " + jobInfoDir);
@@ -301,7 +301,7 @@ class CompletedJobStatusStore implements
   }
 
   /**
-   * This method retrieves Counters information from DFS stored using
+   * This method retrieves Counters information from file stored using
    * store method.
    *
    * @param jobId the jobId for which Counters is queried
@@ -315,9 +315,13 @@ class CompletedJobStatusStore implements
         FSDataInputStream dataIn = getJobInfoFile(jobId);
         if (dataIn != null) {
           JobStatus jobStatus = readJobStatus(dataIn);
-          jobACLsManager.checkAccess(jobStatus,
-              UserGroupInformation.getCurrentUser(), JobACL.VIEW_JOB);
-          readJobProfile(dataIn);
+          JobProfile profile = readJobProfile(dataIn);
+          String queue = profile.getQueueName();
+          // authorize the user for job view access
+          aclsManager.checkAccess(jobStatus,
+              UserGroupInformation.getCurrentUser(), queue,
+              Operation.VIEW_JOB_COUNTERS);
+
           counters = readCounters(dataIn);
           dataIn.close();
         }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DeprecatedQueueConfigurationParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DeprecatedQueueConfigurationParser.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DeprecatedQueueConfigurationParser.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/DeprecatedQueueConfigurationParser.java Fri Sep 17 07:34:39 2010
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.QueueState;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import static org.apache.hadoop.mapred.QueueManager.*;
@@ -46,7 +47,7 @@ class DeprecatedQueueConfigurationParser
       return;
     }
     List<Queue> listq = createQueues(conf);
-    this.setAclsEnabled(conf.getBoolean("mapred.acls.enabled", false));
+    this.setAclsEnabled(conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false));
     root = new Queue();
     root.setName("");
     for (Queue q : listq) {
@@ -78,9 +79,8 @@ class DeprecatedQueueConfigurationParser
    */
   private QueueState getQueueState(String name, Configuration conf) {
     String stateVal = conf.get(
-      QueueManager.toFullPropertyName(
-        name,"state"),
-      QueueState.RUNNING.getStateName());
+        toFullPropertyName(name, "state"),
+        QueueState.RUNNING.getStateName());
     return QueueState.getState(stateVal);
   }
 
@@ -105,21 +105,11 @@ class DeprecatedQueueConfigurationParser
       queues = conf.getStrings(MAPRED_QUEUE_NAMES_KEY);
     }
 
-    // check if the acls flag is defined
-    String aclsEnable = conf.get("mapred.acls.enabled");
-    if (aclsEnable != null) {
-      LOG.warn(
-        "Configuring \"mapred.acls.enabled\" in mapred-site.xml or " +
-          "hadoop-site.xml is deprecated. Configure " +
-          "queue hierarchy in " +
-          QUEUE_CONF_FILE_NAME);
-    }
-
     // check if acls are defined
     if (queues != null) {
       for (String queue : queues) {
-        for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
-          String key = toFullPropertyName(queue, oper.getAclName());
+        for (QueueACL qAcl : QueueACL.values()) {
+          String key = toFullPropertyName(queue, qAcl.getAclName());
           String aclString = conf.get(key);
           if (aclString != null) {
             LOG.warn(
@@ -149,8 +139,8 @@ class DeprecatedQueueConfigurationParser
     Configuration conf) {
     HashMap<String, AccessControlList> map =
       new HashMap<String, AccessControlList>();
-    for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
-      String aclKey = toFullPropertyName(name, oper.getAclName());
+    for (QueueACL qAcl : QueueACL.values()) {
+      String aclKey = toFullPropertyName(name, qAcl.getAclName());
       map.put(
         aclKey, new AccessControlList(
           conf.get(

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JSPUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JSPUtil.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JSPUtil.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JSPUtil.java Fri Sep 17 07:34:39 2010
@@ -95,14 +95,14 @@ class JSPUtil {
    *         and decide if view should be allowed or not. Job will be null if
    *         the job with given jobid doesnot exist at the JobTracker.
    */
-  public static JobWithViewAccessCheck checkAccessAndGetJob(JobTracker jt,
+  public static JobWithViewAccessCheck checkAccessAndGetJob(final JobTracker jt,
       JobID jobid, HttpServletRequest request, HttpServletResponse response)
       throws ServletException, IOException {
     final JobInProgress job = jt.getJob(jobid);
     JobWithViewAccessCheck myJob = new JobWithViewAccessCheck(job);
 
     String user = request.getRemoteUser();
-    if (user != null && job != null && jt.isJobLevelAuthorizationEnabled()) {
+    if (user != null && job != null && jt.areACLsEnabled()) {
       final UserGroupInformation ugi =
         UserGroupInformation.createRemoteUser(user);
       try {
@@ -110,7 +110,8 @@ class JSPUtil {
           public Void run() throws IOException, ServletException {
 
             // checks job view permission
-            job.checkAccess(ugi, JobACL.VIEW_JOB);
+            jt.getACLsManager().checkAccess(job, ugi,
+                Operation.VIEW_JOB_DETAILS);
             return null;
           }
         });
@@ -475,10 +476,10 @@ class JSPUtil {
    * Read a job-history log file and construct the corresponding {@link JobInfo}
    * . Also cache the {@link JobInfo} for quick serving further requests.
    * 
-   * @param logFile
-   * @param fs
-   * @param jobTracker
-   * @return JobInfo
+   * @param logFile      the job history log file
+   * @param fs           job tracker file system
+   * @param jobTracker   the job tracker
+   * @return JobInfo     job's basic information
    * @throws IOException
    */
   static JobInfo getJobInfo(Path logFile, FileSystem fs,
@@ -506,20 +507,18 @@ class JSPUtil {
       }
     }
 
-    jobTracker.getJobACLsManager().checkAccess(JobID.forName(jobid),
-        UserGroupInformation.getCurrentUser(), JobACL.VIEW_JOB,
-        jobInfo.getUsername(), jobInfo.getJobACLs().get(JobACL.VIEW_JOB));
     return jobInfo;
   }
 
   /**
-   * Check the access for users to view job-history pages.
+   * Check the access for users to view job-history pages and return
+   * {@link JobInfo}.
    * 
-   * @param request
-   * @param response
-   * @param jobTracker
-   * @param fs
-   * @param logFile
+   * @param request     http servlet request
+   * @param response    http servlet response
+   * @param jobTracker  the job tracker
+   * @param fs          job tracker file system
+   * @param logFile     the job history log file
    * @return the job if authorization is disabled or if the authorization checks
    *         pass. Otherwise return null.
    * @throws IOException
@@ -533,19 +532,24 @@ class JSPUtil {
     String jobid =
         JobHistory.getJobIDFromHistoryFilePath(logFile).toString();
     String user = request.getRemoteUser();
-    JobInfo job = null;
+
+    JobInfo jobInfo = JSPUtil.getJobInfo(logFile, fs, jobTracker);
     if (user != null) {
+      // authorize user for job-view access
       try {
         final UserGroupInformation ugi =
             UserGroupInformation.createRemoteUser(user);
-        job =
-            ugi.doAs(new PrivilegedExceptionAction<JobHistoryParser.JobInfo>() {
-              public JobInfo run() throws IOException {
-                // checks job view permission
-                JobInfo jobInfo = JSPUtil.getJobInfo(logFile, fs, jobTracker);
-                return jobInfo;
-              }
-            });
+        
+        AccessControlList viewJobAcl = jobInfo.getJobACLs().get(JobACL.VIEW_JOB);
+        if (viewJobAcl == null) {
+          // may be older job history file of earlier unsecure cluster
+          viewJobAcl = new AccessControlList("*");
+        }
+
+        jobTracker.getACLsManager().checkAccess(jobid, ugi,
+            jobInfo.getJobQueueName(), Operation.VIEW_JOB_DETAILS,
+            jobInfo.getUsername(), viewJobAcl);
+
       } catch (AccessControlException e) {
         String errMsg =
             String.format(
@@ -557,11 +561,9 @@ class JSPUtil {
         JSPUtil.setErrorAndForward(errMsg, request, response);
         return null;
       }
-    } else {
-      // no authorization needed
-      job = JSPUtil.getJobInfo(logFile, fs, jobTracker);
-    }
-    return job;
+    } // else { no authorization needed }
+
+    return jobInfo;
   }
 
   /**
@@ -574,7 +576,7 @@ class JSPUtil {
   static void printJobACLs(JobTracker tracker,
       Map<JobACL, AccessControlList> jobAcls, JspWriter out)
       throws IOException {
-    if (tracker.isJobLevelAuthorizationEnabled()) {
+    if (tracker.areACLsEnabled()) {
       // Display job-view-acls and job-modify-acls configured for this job
       out.print("<b>Job-ACLs:</b><br>");
       for (JobACL aclName : JobACL.values()) {
@@ -587,5 +589,9 @@ class JSPUtil {
         }
       }
     }
+    else {
+      out.print("<b>Job-ACLs: " + new AccessControlList("*").toString()
+          + "</b><br>");
+    }
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java Fri Sep 17 07:34:39 2010
@@ -20,26 +20,25 @@ package org.apache.hadoop.mapred;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.mapred.AuditLogger.Constants;
 import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 
 @InterfaceAudience.Private
-public abstract class JobACLsManager {
+class JobACLsManager {
 
-  static final Log LOG = LogFactory.getLog(JobACLsManager.class);
+  JobConf conf;
 
-  public static final String UNAUTHORIZED_JOB_ACCESS_ERROR =
-      " is not authorized for performing the operation ";
-  protected abstract boolean isJobLevelAuthorizationEnabled();
+  public JobACLsManager(JobConf conf) {
+    this.conf = conf;
+  }
 
-  protected abstract boolean isSuperUserOrSuperGroup(
-      UserGroupInformation callerUGI);
+  boolean areACLsEnabled() {
+    return conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
+  }
 
   /**
    * Construct the jobACLs from the configuration so that they can be kept in
@@ -54,7 +53,7 @@ public abstract class JobACLsManager {
         new HashMap<JobACL, AccessControlList>();
 
     // Don't construct anything if authorization is disabled.
-    if (!isJobLevelAuthorizationEnabled()) {
+    if (!areACLsEnabled()) {
       return acls;
     }
 
@@ -64,7 +63,7 @@ public abstract class JobACLsManager {
       if (aclConfigured == null) {
         // If ACLs are not configured at all, we grant no access to anyone. So
         // jobOwner and superuser/supergroup _only_ can do 'stuff'
-        aclConfigured = "";
+        aclConfigured = " ";
       }
       acls.put(aclName, new AccessControlList(aclConfigured));
     }
@@ -72,69 +71,34 @@ public abstract class JobACLsManager {
   }
 
   /**
-   * If authorization is enabled, checks whether the user (in the callerUGI) is
-   * authorized to perform the operation specified by 'jobOperation' on the job.
-   * <ul>
-   * <li>The owner of the job can do any operation on the job</li>
-   * <li>The superuser/supergroup is always permitted to do operations on any
-   * job.</li>
-   * <li>For all other users/groups job-acls are checked</li>
-   * </ul>
-   * 
-   * @param jobStatus
-   * @param callerUGI
-   * @param jobOperation
-   */
-  void checkAccess(JobStatus jobStatus, UserGroupInformation callerUGI,
-      JobACL jobOperation) throws AccessControlException {
-
-    JobID jobId = jobStatus.getJobID();
-    String jobOwner = jobStatus.getUsername();
-    AccessControlList acl = jobStatus.getJobACLs().get(jobOperation);
-    checkAccess(jobId, callerUGI, jobOperation, jobOwner, acl);
-  }
-
-  /**
-   * If authorization is enabled, checks whether the user (in the callerUGI) is
-   * authorized to perform the operation specified by 'jobOperation' on the job.
+   * If authorization is enabled, checks whether the user (in the callerUGI)
+   * is authorized to perform the operation specified by 'jobOperation' on
+   * the job by checking if the user is jobOwner or part of job ACL for the
+   * specific job operation.
    * <ul>
    * <li>The owner of the job can do any operation on the job</li>
-   * <li>The superuser/supergroup is always permitted to do operations on any
-   * job.</li>
    * <li>For all other users/groups job-acls are checked</li>
    * </ul>
-   * @param jobId
    * @param callerUGI
    * @param jobOperation
    * @param jobOwner
    * @param jobACL
    * @throws AccessControlException
    */
-  void checkAccess(JobID jobId, UserGroupInformation callerUGI,
-      JobACL jobOperation, String jobOwner, AccessControlList jobACL)
-      throws AccessControlException {
+  boolean checkAccess(UserGroupInformation callerUGI,
+      JobACL jobOperation, String jobOwner, AccessControlList jobACL) {
 
     String user = callerUGI.getShortUserName();
-    if (!isJobLevelAuthorizationEnabled()) {
-      return;
+    if (!areACLsEnabled()) {
+      return true;
     }
 
-    // Allow uperusers/supergroups
-    // Allow Job-owner as job's owner is always part of all the ACLs
-    if (callerUGI.getShortUserName().equals(jobOwner)
-        || isSuperUserOrSuperGroup(callerUGI)
+    // Allow Job-owner for any operation on the job
+    if (user.equals(jobOwner)
         || jobACL.isUserAllowed(callerUGI)) {
-      AuditLogger.logSuccess(user, jobOperation.name(),  jobId.toString());
-      return;
+      return true;
     }
 
-    AuditLogger.logFailure(user, jobOperation.name(), null, jobId.toString(),
-                           Constants.UNAUTHORIZED_USER);
-    throw new AccessControlException(callerUGI
-        + UNAUTHORIZED_JOB_ACCESS_ERROR
-        + jobOperation.toString() + " on " + jobId + ". "
-        + jobOperation.toString()
-        + " Access control list configured for this job : "
-        + jobACL.toString());
+    return false;
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Sep 17 07:34:39 2010
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumMap;
 import java.util.HashMap;
@@ -43,7 +42,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -51,7 +49,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobCounter;
-import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -79,13 +76,9 @@ import org.apache.hadoop.mapreduce.split
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -432,7 +425,7 @@ public class JobInProgress {
         String desc = "The username " + conf.getUser() + " obtained from the "
             + "conf doesn't match the username " + user + " the user "
             + "authenticated as";
-        AuditLogger.logFailure(user, Queue.QueueOperation.SUBMIT_JOB.name(),
+        AuditLogger.logFailure(user, Operation.SUBMIT_JOB.name(),
             conf.getUser(), jobId.toString(), desc);
         throw new IOException(desc);
       }
@@ -724,12 +717,13 @@ public class JobInProgress {
     String username = conf.getUser();
     if (username == null) { username = ""; }
     String jobname = conf.getJobName();
-    if (jobname == null) { jobname = ""; }
+    String jobQueueName = conf.getQueueName();
+
     setUpLocalizedJobConf(conf, jobId);
     jobHistory.setupEventWriter(jobId, conf);
     JobSubmittedEvent jse =
         new JobSubmittedEvent(jobId, jobname, username, this.startTime,
-            jobFile.toString(), status.getJobACLs());
+            jobFile.toString(), status.getJobACLs(), jobQueueName);
     jobHistory.logEvent(jse, jobId);
     
   }
@@ -742,25 +736,6 @@ public class JobInProgress {
   }
 
   /**
-   * If authorization is enabled on the JobTracker, checks whether the user (in
-   * the callerUGI) is authorized to perform the operation specify by
-   * 'jobOperation' on the job.
-   * <ul>
-   * <li>The owner of the job can do any operation on the job</li>
-   * <li>The superuser/supergroup of the JobTracker is always permitted to do
-   * operations on any job.</li>
-   * <li>For all other users/groups job-acls are checked</li>
-   * </ul>
-   * 
-   * @param callerUGI
-   * @param jobOperation
-   */
-  void checkAccess(UserGroupInformation callerUGI, JobACL jobOperation)
-      throws AccessControlException {
-    jobtracker.getJobACLsManager().checkAccess(status, callerUGI, jobOperation);
-  }
-
-  /**
    * If the number of taks is greater than the configured value
    * throw an exception that will fail job initialization
    */

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Sep 17 07:34:39 2010
@@ -77,7 +77,6 @@ import org.apache.hadoop.mapred.JobStatu
 import org.apache.hadoop.mapred.JobTrackerStatistics.TaskTrackerStat;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
-import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.QueueInfo;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
@@ -105,6 +104,7 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
@@ -1289,7 +1289,6 @@ public class JobTracker implements MRCon
                                                 "expireLaunchingTasks");
 
   final CompletedJobStatusStore completedJobStatusStore;
-  private JobTrackerJobACLsManager jobACLsManager;
   Thread completedJobsStoreThread = null;
   final RecoveryManager recoveryManager;
 
@@ -1330,8 +1329,8 @@ public class JobTracker implements MRCon
   FileSystem fs = null;
   Path systemDir = null;
   JobConf conf;
-  private final UserGroupInformation mrOwner;
-  private final String supergroup;
+
+  private final ACLsManager aclsManager;
 
   long limitMaxMemForMapTasks;
   long limitMaxMemForReduceTasks;
@@ -1347,7 +1346,7 @@ public class JobTracker implements MRCon
     retiredJobsCacheSize = 0;
     infoServer = null;
     queueManager = null;
-    supergroup = null;
+    aclsManager = null;
     taskScheduler = null;
     trackerIdentifier = null;
     recoveryManager = null;
@@ -1355,7 +1354,6 @@ public class JobTracker implements MRCon
     completedJobStatusStore = null;
     tasktrackerExpiryInterval = 0;
     myInstrumentation = new JobTrackerMetricsInst(this, new JobConf());
-    mrOwner = null;
     secretManager = null;
     localFs = null;
   }
@@ -1382,11 +1380,7 @@ public class JobTracker implements MRCon
     UserGroupInformation.setConfiguration(conf);
     SecurityUtil.login(conf, JTConfig.JT_KEYTAB_FILE, JTConfig.JT_USER_NAME,
         localMachine);
-    mrOwner = UserGroupInformation.getCurrentUser();
-    
-    supergroup = conf.get(MR_SUPERGROUP, "supergroup");
-    LOG.info("Starting jobtracker with owner as " + mrOwner.getShortUserName() 
-             + " and supergroup as " + supergroup);
+
     clock = newClock;
     
     long secretKeyInterval = 
@@ -1443,9 +1437,15 @@ public class JobTracker implements MRCon
     this.hostsReader = new HostsFileReader(conf.get(JTConfig.JT_HOSTS_FILENAME, ""),
                                            conf.get(JTConfig.JT_HOSTS_EXCLUDE_FILENAME, ""));
 
-    Configuration queuesConf = new Configuration(this.conf);
-    queueManager = new QueueManager(queuesConf);
+    Configuration clusterConf = new Configuration(this.conf);
+    queueManager = new QueueManager(clusterConf);
     
+    aclsManager = new ACLsManager(conf, new JobACLsManager(conf), queueManager);
+
+    LOG.info("Starting jobtracker with owner as " +
+        getMROwner().getShortUserName() + " and supergroup as " +
+        getSuperGroup());
+
     // Create the scheduler
     Class<? extends TaskScheduler> schedulerClass
       = conf.getClass(JT_TASK_SCHEDULER,
@@ -1526,7 +1526,7 @@ public class JobTracker implements MRCon
       try {
         // if we haven't contacted the namenode go ahead and do it
         if (fs == null) {
-          fs = mrOwner.doAs(new PrivilegedExceptionAction<FileSystem>() {
+          fs = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() {
             public FileSystem run() throws IOException {
               return FileSystem.get(conf);
           }});
@@ -1538,9 +1538,10 @@ public class JobTracker implements MRCon
         }
         try {
           FileStatus systemDirStatus = fs.getFileStatus(systemDir);
-          if (!systemDirStatus.getOwner().equals(mrOwner.getShortUserName())) {
+          if (!systemDirStatus.getOwner().equals(
+              getMROwner().getShortUserName())) {
             throw new AccessControlException("The systemdir " + systemDir + 
-                " is not owned by " + mrOwner.getShortUserName());
+                " is not owned by " + getMROwner().getShortUserName());
           }
           if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) {
             LOG.warn("Incorrect permissions on " + systemDir + 
@@ -1607,7 +1608,8 @@ public class JobTracker implements MRCon
     final String historyLogDir = 
       jobHistory.getCompletedJobHistoryLocation().toString();
     infoServer.setAttribute("historyLogDir", historyLogDir);
-    FileSystem historyFS = mrOwner.doAs(new PrivilegedExceptionAction<FileSystem>() {
+    FileSystem historyFS = getMROwner().doAs(
+        new PrivilegedExceptionAction<FileSystem>() {
       public FileSystem run() throws IOException {
         return new Path(historyLogDir).getFileSystem(conf);
       }
@@ -1620,10 +1622,8 @@ public class JobTracker implements MRCon
     this.numTaskCacheLevels = conf.getInt(JT_TASKCACHE_LEVELS, 
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
-    // Initialize the jobACLSManager
-    jobACLsManager = new JobTrackerJobACLsManager(this);
     //initializes the job status store
-    completedJobStatusStore = new CompletedJobStatusStore(jobACLsManager, conf);
+    completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
   }
 
   private static SimpleDateFormat getDateFormat() {
@@ -3056,7 +3056,7 @@ public class JobTracker implements MRCon
         throw ioe;
       }
       try {
-        checkAccess(job, ugi, Queue.QueueOperation.SUBMIT_JOB, null);
+        aclsManager.checkAccess(job, ugi, Operation.SUBMIT_JOB);
       } catch (AccessControlException ace) {
         LOG.warn("Access denied for user " + job.getJobConf().getUser()
             + ". Ignoring job " + jobId, ace);
@@ -3109,19 +3109,8 @@ public class JobTracker implements MRCon
     LOG.info("Job " + jobId + " added successfully for user '" 
              + job.getJobConf().getUser() + "' to queue '" 
              + job.getJobConf().getQueueName() + "'");
-    AuditLogger.logSuccess(job.getUser(),
-        Queue.QueueOperation.SUBMIT_JOB.name(), jobId.toString());
-    return job.getStatus();
-  }
 
-  /**
-   * Is job-level authorization enabled on the JT?
-   * 
-   * @return
-   */
-  boolean isJobLevelAuthorizationEnabled() {
-    return conf.getBoolean(
-        MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, false);
+    return job.getStatus();
   }
 
   /**
@@ -3143,45 +3132,12 @@ public class JobTracker implements MRCon
   }
 
   /**
-   * Check the ACLs for a user doing the passed queue-operation and the passed
-   * job operation.
-   * <ul>
-   * <li>Superuser/supergroup can do any operation on the job</li>
-   * <li>For any other user/group, the configured ACLs for the corresponding
-   * queue and the job are checked.</li>
-   * </ul>
-   * 
-   * @param job
-   * @param callerUGI
-   * @param oper
-   * @param jobOperation
-   * @throws AccessControlException
-   * @throws IOException
+   * Are ACLs for authorization checks enabled on the MR cluster ?
+   *
+   * @return true if ACLs(job acls and queue acls) are enabled
    */
-  private void checkAccess(JobInProgress job,
-      UserGroupInformation callerUGI, Queue.QueueOperation oper,
-      JobACL jobOperation) throws AccessControlException {
-
-    // get the queue and verify the queue access
-    String queue = job.getProfile().getQueueName();
-    if (!queueManager.hasAccess(queue, job, oper, callerUGI)) {
-      throw new AccessControlException("User " 
-                            + callerUGI.getShortUserName() 
-                            + " cannot perform "
-                            + "operation " + oper + " on queue " + queue +
-                            ".\n Please run \"hadoop queue -showacls\" " +
-                            "command to find the queues you have access" +
-                            " to .");
-    }
-
-    // check nulls, for e.g., submitJob RPC doesn't have a jobOperation as the
-    // job itself isn't created by that time.
-    if (jobOperation == null) {
-      return;
-    }
-
-    // check the access to the job
-    job.checkAccess(callerUGI, jobOperation);
+  boolean areACLsEnabled() {
+    return conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
   }
 
   /**@deprecated use {@link #getClusterStatus(boolean)}*/
@@ -3295,8 +3251,8 @@ public class JobTracker implements MRCon
     }
 
     // check both queue-level and job-level access
-    checkAccess(job, UserGroupInformation.getCurrentUser(),
-        Queue.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
+    aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+        Operation.KILL_JOB);
 
     killJob(job);
   }
@@ -3530,8 +3486,8 @@ public class JobTracker implements MRCon
       JobInProgress job = jobs.get(oldJobID);
       if (job != null) {
 	// check the job-access
-        job.checkAccess(UserGroupInformation.getCurrentUser(),
-            JobACL.VIEW_JOB);
+        aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+            Operation.VIEW_JOB_COUNTERS);
 
         if (!isJobInited(job)) {
 	  return EMPTY_COUNTERS;
@@ -3703,8 +3659,8 @@ public class JobTracker implements MRCon
     // Check authorization
     JobInProgress job = jobs.get(jobid);
     if (job != null) {
-      job.checkAccess(UserGroupInformation.getCurrentUser(),
-          JobACL.VIEW_JOB);
+      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+          Operation.VIEW_JOB_DETAILS);
     } else { 
       return EMPTY_TASK_REPORTS;
     }
@@ -3779,8 +3735,8 @@ public class JobTracker implements MRCon
     if (job != null) {
 
       // check the access to the job.
-      job.checkAccess(UserGroupInformation.getCurrentUser(),
-          JobACL.VIEW_JOB);
+      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+          Operation.VIEW_JOB_DETAILS);
 
       if (isJobInited(job)) {
         TaskInProgress tip = job.getTaskInProgress(tipId);
@@ -3851,8 +3807,9 @@ public class JobTracker implements MRCon
     if (tip != null) {
 
       // check both queue-level and job-level access
-      checkAccess(tip.getJob(), UserGroupInformation.getCurrentUser(),
-          Queue.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
+      aclsManager.checkAccess(tip.getJob(),
+          UserGroupInformation.getCurrentUser(),
+          shouldFail ? Operation.FAIL_TASK : Operation.KILL_TASK);
 
       return tip.killTask(taskid, shouldFail);
     }
@@ -3899,8 +3856,9 @@ public class JobTracker implements MRCon
    */
   public String getStagingAreaDir() throws IOException {
     try {
-      final String user = UserGroupInformation.getCurrentUser().getShortUserName();
-      return mrOwner.doAs(new PrivilegedExceptionAction<String>() {
+      final String user =
+          UserGroupInformation.getCurrentUser().getShortUserName();
+      return getMROwner().doAs(new PrivilegedExceptionAction<String>() {
         @Override
         public String run() throws Exception {
           Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, 
@@ -3923,6 +3881,18 @@ public class JobTracker implements MRCon
     return jobHistory.getCompletedJobHistoryLocation().toString();
   }
 
+  /**
+   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getQueueAdmins(String)
+   */
+  public AccessControlList getQueueAdmins(String queueName) throws IOException {
+	  AccessControlList acl =
+		  queueManager.getQueueACL(queueName, QueueACL.ADMINISTER_JOBS);
+	  if (acl == null) {
+		  acl = new AccessControlList(" ");
+	  }
+	  return acl;
+  }
+
   ///////////////////////////////////////////////////////////////
   // JobTracker methods
   ///////////////////////////////////////////////////////////////
@@ -3954,8 +3924,8 @@ public class JobTracker implements MRCon
     if (job != null) {
 
       // check both queue-level and job-level access
-      checkAccess(job, UserGroupInformation.getCurrentUser(),
-          Queue.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
+      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
+          Operation.SET_JOB_PRIORITY);
 
       synchronized (taskScheduler) {
         JobStatus oldStatus = (JobStatus)job.getStatus().clone();
@@ -4136,24 +4106,6 @@ public class JobTracker implements MRCon
       removeMarkedTasks(trackerName);
     }
   }
-  
-  /**
-   * Is the calling user a super user? Or part of the supergroup?
-   * @return true, if it is a super user
-   */
-  static boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI,
-      UserGroupInformation superUser, String superGroup) {
-    if (superUser.getShortUserName().equals(callerUGI.getShortUserName())) {
-      return true;
-    }
-    String[] groups = callerUGI.getGroupNames();
-    for(int i=0; i < groups.length; ++i) {
-      if (groups[i].equals(superGroup)) {
-        return true;
-      }
-    }
-    return false;
-  }
 
   /**
    * Rereads the config to get hosts and exclude list file names.
@@ -4162,10 +4114,9 @@ public class JobTracker implements MRCon
   public synchronized void refreshNodes() throws IOException {
     String user = UserGroupInformation.getCurrentUser().getShortUserName();
     // check access
-    if (!isSuperUserOrSuperGroup(UserGroupInformation.getCurrentUser(), mrOwner,
-                                 supergroup)) {
+    if (!isMRAdmin(UserGroupInformation.getCurrentUser())) {
       AuditLogger.logFailure(user, Constants.REFRESH_NODES,
-          mrOwner + " " + supergroup, Constants.JOBTRACKER,
+          getMROwner() + " " + getSuperGroup(), Constants.JOBTRACKER,
           Constants.UNAUTHORIZED_USER);
       throw new AccessControlException(user + 
                                        " is not authorized to refresh nodes.");
@@ -4175,15 +4126,19 @@ public class JobTracker implements MRCon
     // call the actual api
     refreshHosts();
   }
-  
+
   UserGroupInformation getMROwner() {
-    return mrOwner;
+    return aclsManager.getMROwner();
   }
 
   String getSuperGroup() {
-    return supergroup;
+    return aclsManager.getSuperGroup();
   }
-  
+
+  boolean isMRAdmin(UserGroupInformation ugi) {
+    return aclsManager.isMRAdmin(ugi);
+  }
+
   private synchronized void refreshHosts() throws IOException {
     // Reread the config to get HOSTS and HOSTS_EXCLUDE filenames.
     // Update the file names and refresh internal includes and excludes list
@@ -4260,8 +4215,8 @@ public class JobTracker implements MRCon
         if ("-dumpConfiguration".equals(argv[0]) && argv.length == 1) {
           dumpConfiguration(new PrintWriter(System.out));
           System.out.println();
-          QueueManager.dumpConfiguration(new PrintWriter(System.out),
-              new JobConf());
+          Configuration conf = new Configuration();
+          QueueManager.dumpConfiguration(new PrintWriter(System.out), conf);
         }
         else {
           System.out.println("usage: JobTracker [-dumpConfiguration]");
@@ -4612,16 +4567,20 @@ public class JobTracker implements MRCon
     UserGroupInformation.setConfiguration(conf);
     SecurityUtil.login(conf, JTConfig.JT_KEYTAB_FILE, JTConfig.JT_USER_NAME,
         localMachine);
-    mrOwner = UserGroupInformation.getCurrentUser();
-    supergroup = conf.get(MRConfig.MR_SUPERGROUP, "supergroup");
     
     secretManager = null;
     
     this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
         conf.get("mapred.hosts.exclude", ""));
     // queue manager
-    Configuration queuesConf = new Configuration(this.conf);
-    queueManager = new QueueManager(queuesConf);
+    Configuration clusterConf = new Configuration(this.conf);
+    queueManager = new QueueManager(clusterConf);
+
+    aclsManager = new ACLsManager(conf, new JobACLsManager(conf), queueManager);
+
+    LOG.info("Starting jobtracker with owner as " +
+        getMROwner().getShortUserName() + " and supergroup as " +
+        getSuperGroup());
 
     // Create the scheduler
     Class<? extends TaskScheduler> schedulerClass
@@ -4646,7 +4605,7 @@ public class JobTracker implements MRCon
     jobHistory = new JobHistory();
     final JobTracker jtFinal = this;
     try {
-      historyFS = mrOwner.doAs(new PrivilegedExceptionAction<FileSystem>() {
+      historyFS = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() {
         public FileSystem run() throws IOException {
           jobHistory.init(jtFinal, conf, jtFinal.localMachine, jtFinal.startTime);
           jobHistory.initDone(conf, fs);
@@ -4690,11 +4649,8 @@ public class JobTracker implements MRCon
     this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
-    // Initialize the jobACLSManager
-    jobACLsManager = new JobTrackerJobACLsManager(this);
-
     //initializes the job status store
-    completedJobStatusStore = new CompletedJobStatusStore(jobACLsManager, conf);
+    completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
   }
 
   /**
@@ -4756,9 +4712,13 @@ public class JobTracker implements MRCon
   }
 
   JobACLsManager getJobACLsManager() {
-    return jobACLsManager;
+    return aclsManager.getJobACLsManager();
   }
-  
+
+  ACLsManager getACLsManager() {
+    return aclsManager;
+  }
+
   /**
    * 
    * @return true if delegation token operation is allowed

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Sep 17 07:34:39 2010
@@ -51,7 +51,6 @@ import org.apache.hadoop.mapreduce.filec
 import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
-import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@@ -59,6 +58,7 @@ import org.apache.hadoop.mapreduce.serve
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 
 /** Implements MapReduce locally, in-process, for debugging. */
@@ -698,6 +698,13 @@ public class LocalJobRunner implements C
   }
 
   /**
+   * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getQueueAdmins()
+   */
+  public AccessControlList getQueueAdmins(String queueName) throws IOException {
+	  return new AccessControlList(" ");// no queue admins for local job runner
+  }
+
+  /**
    * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir()
    */
   public String getStagingAreaDir() throws IOException {

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Operation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Operation.java?rev=998003&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Operation.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Operation.java Fri Sep 17 07:34:39 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.mapreduce.JobACL;
+
+/**
+ * Generic operation that maps to the dependent set of ACLs that drive the
+ * authorization of the operation.
+ */
+@InterfaceAudience.Private
+public enum Operation {
+  VIEW_JOB_COUNTERS(QueueACL.ADMINISTER_JOBS, JobACL.VIEW_JOB),
+  VIEW_JOB_DETAILS(QueueACL.ADMINISTER_JOBS, JobACL.VIEW_JOB),
+  VIEW_TASK_LOGS(QueueACL.ADMINISTER_JOBS, JobACL.VIEW_JOB),
+  KILL_JOB(QueueACL.ADMINISTER_JOBS, JobACL.MODIFY_JOB),
+  FAIL_TASK(QueueACL.ADMINISTER_JOBS, JobACL.MODIFY_JOB),
+  KILL_TASK(QueueACL.ADMINISTER_JOBS, JobACL.MODIFY_JOB),
+  SET_JOB_PRIORITY(QueueACL.ADMINISTER_JOBS, JobACL.MODIFY_JOB),
+  SUBMIT_JOB(QueueACL.SUBMIT_JOB, null);
+  
+  public QueueACL qACLNeeded;
+  public JobACL jobACLNeeded;
+  
+  Operation(QueueACL qACL, JobACL jobACL) {
+    this.qACLNeeded = qACL;
+    this.jobACLNeeded = jobACL;
+  }
+}
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Queue.java?rev=998003&r1=998002&r2=998003&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Queue.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Queue.java Fri Sep 17 07:34:39 2010
@@ -42,8 +42,7 @@ class Queue implements Comparable<Queue>
   private String name = null;
 
   //acls list
-  private Map<String, 
-              org.apache.hadoop.security.authorize.AccessControlList> acls;
+  private Map<String, AccessControlList> acls;
 
   //Queue State
   private QueueState state = QueueState.RUNNING;
@@ -59,34 +58,6 @@ class Queue implements Comparable<Queue>
   private Properties props;
 
   /**
-   * Enum representing an operation that can be performed on a queue.
-   */
-  static enum QueueOperation {
-    SUBMIT_JOB ("acl-submit-job", false),
-    ADMINISTER_JOBS ("acl-administer-jobs", true);
-    // TODO: Add ACL for LIST_JOBS when we have ability to authenticate
-    //       users in UI
-    // TODO: Add ACL for CHANGE_ACL when we have an admin tool for
-    //       configuring queues.
-
-    private final String aclName;
-    private final boolean jobOwnerAllowed;
-
-    QueueOperation(String aclName, boolean jobOwnerAllowed) {
-      this.aclName = aclName;
-      this.jobOwnerAllowed = jobOwnerAllowed;
-    }
-
-    final String getAclName() {
-      return aclName;
-    }
-
-    final boolean isJobOwnerAllowed() {
-      return jobOwnerAllowed;
-    }
-  }
-
-  /**
    * Default constructor is useful in creating the hierarchy.
    * The variables are populated using mutator methods.
    */
@@ -133,7 +104,7 @@ class Queue implements Comparable<Queue>
    * @return Map containing the operations that can be performed and
    *          who can perform the operations.
    */
-  Map<String, org.apache.hadoop.security.authorize.AccessControlList> getAcls() {
+  Map<String, AccessControlList> getAcls() {
     return acls;
   }
   

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueACL.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueACL.java?rev=998003&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueACL.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/QueueACL.java Fri Sep 17 07:34:39 2010
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Enum representing an AccessControlList that drives set of operations that
+ * can be performed on a queue.
+ */
+@InterfaceAudience.Private
+public enum QueueACL {
+  SUBMIT_JOB ("acl-submit-job"),
+  ADMINISTER_JOBS ("acl-administer-jobs");
+  // Currently this ACL acl-administer-jobs is checked for the operations
+  // FAIL_TASK, KILL_TASK, KILL_JOB, SET_JOB_PRIORITY and VIEW_JOB.
+
+  // TODO: Add ACL for LIST_JOBS when we have ability to authenticate
+  //       users in UI
+  // TODO: Add ACL for CHANGE_ACL when we have an admin tool for
+  //       configuring queues.
+
+  private final String aclName;
+
+  QueueACL(String aclName) {
+    this.aclName = aclName;
+  }
+
+  public final String getAclName() {
+    return aclName;
+  }
+}
\ No newline at end of file