You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:13:27 UTC

svn commit: r1077423 [1/2] - in /hadoop/common/branches/branch-0.20-security-patches: conf/ src/docs/src/documentation/content/xdocs/ src/mapred/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: omalley
Date: Fri Mar  4 04:13:26 2011
New Revision: 1077423

URL: http://svn.apache.org/viewvc?rev=1077423&view=rev
Log:
commit 7486ae2d1d5b7d607dc1ec621e15fd08a971663f
Author: Vinod Kumar <vi...@yahoo-inc.com>
Date:   Sat Apr 24 00:22:59 2010 +0530

    MAPREDUCE:1664. From https://issues.apache.org/jira/secure/attachment/12442697/1664.20S.3.4.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1664. Job Acls affect when Queue Acls are set.
    +    (Ravi Gummadi via vinodkv)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ACLsManager.java
Removed:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTrackerJobACLsManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerJobACLsManager.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/conf/mapred-queue-acls.xml.template
    hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml
    hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CompletedJobStatusStore.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JSPUtil.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobACLsManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/QueueManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobACLs.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestWebUIAuthorization.java

Modified: hadoop/common/branches/branch-0.20-security-patches/conf/mapred-queue-acls.xml.template
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/conf/mapred-queue-acls.xml.template?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/conf/mapred-queue-acls.xml.template (original)
+++ hadoop/common/branches/branch-0.20-security-patches/conf/mapred-queue-acls.xml.template Fri Mar  4 04:13:26 2011
@@ -7,24 +7,40 @@
 
 <property>
   <name>mapred.queue.default.acl-submit-job</name>
-  <value>*</value>
+  <value> </value>
   <description> Comma separated list of user and group names that are allowed
     to submit jobs to the 'default' queue. The user list and the group list
-    are separated by a blank. For e.g. alice,bob group1,group2. 
+    are separated by a blank. For e.g. user1,user2 group1,group2. 
     If set to the special value '*', it means all users are allowed to 
-    submit jobs. 
+    submit jobs. If set to ' '(i.e. space), no user will be allowed to submit
+    jobs.
+
+    It is only used if authorization is enabled in Map/Reduce by setting the
+    configuration property mapred.acls.enabled to true.
+
+    Irrespective of this ACL configuration, the user who started the cluster,
+    members of supergroup configured on JobTracker via
+    mapred.permissions.supergroup can submit jobs.
   </description>
 </property>
 
 <property>
   <name>mapred.queue.default.acl-administer-jobs</name>
-  <value>*</value>
+  <value> </value>
   <description> Comma separated list of user and group names that are allowed
-    to delete jobs or modify job's priority for jobs not owned by the current
-    user in the 'default' queue. The user list and the group list
-    are separated by a blank. For e.g. alice,bob group1,group2. 
+    to delete jobs or modify job's priority for all the jobs
+    in the 'default' queue. The user list and the group list
+    are separated by a blank. For e.g. user1,user2 group1,group2. 
     If set to the special value '*', it means all users are allowed to do 
+    this operation. If set to ' '(i.e. space), no user will be allowed to do
     this operation.
+
+    It is only used if authorization is enabled in Map/Reduce by setting the
+    configuration property mapred.acls.enabled to true.
+
+    Irrespective of this ACL configuration, the user who started the cluster,
+    members of supergroup configured on JobTracker via
+    mapred.permissions.supergroup can do this operation.
   </description>
 </property>
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml Fri Mar  4 04:13:26 2011
@@ -294,27 +294,22 @@
         </tr>
         <tr>
           <td>mapred.acls.enabled</td>
-          <td>Boolean, specifying whether queue ACLs are supported for 
-              authorizing job submission and job administration in a 
-              queue</td>
-          <td>
-            If <em>true</em>, queue ACLs would be checked while submitting
-            and administering jobs. ACLs can be specified using the
-            configuration parameters of the form
-            <em>mapred.queue.queue-name.acl-name</em>, defined below.
+          <td>Boolean, specifying whether checks for queue ACLs and job ACLs
+            are to be done for authorizing users for doing queue operations and
+            job operations.
           </td>
-        </tr>
-        <tr>
-          <td>mapreduce.cluster.job-authorization-enabled</td>
-          <td>Boolean, specifying whether job ACLs are supported for 
-              authorizing view and modification of a job</td>
           <td>
-            If <em>true</em>, job ACLs would be checked while viewing or
-            modifying a job. More details are available at 
-            <a href ="mapred_tutorial.html#Job+Authorization">Job Authorization</a>. 
+            If <em>true</em>, queue ACLs are checked while submitting
+            and administering jobs and job ACLs are checked for authorizing
+            view and modification of jobs. Queue ACLs are specified using the
+            configuration parameters of the form
+            <em>mapred.queue.queue-name.acl-name</em>, defined below under
+            mapred-queue-acls.xml. Job ACLs are described at 
+            <a href ="mapred_tutorial.html#Job+Authorization">Job Authorization
+            </a>
           </td>
         </tr>
-		  </table>
+        </table>
       
       <p><br/><code> conf/mapred-queue-acls.xml</code></p>
       
@@ -337,7 +332,7 @@
           </td>
         </tr>
         <tr>
-          <td>mapred.queue.<em>queue-name</em>.acl-administer-job</td>
+          <td>mapred.queue.<em>queue-name</em>.acl-administer-jobs</td>
           <td>List of users and groups that can change the priority
               or kill jobs that have been submitted to the
               specified <em>queue-name</em>.</td>

Modified: hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Fri Mar  4 04:13:26 2011
@@ -1508,12 +1508,14 @@
         
         <section>
           <title>Job Authorization</title>
-          <p>Job level authorization is enabled on the cluster, if the configuration
-          <code>mapreduce.cluster.job-authorization-enabled</code> is set to
-          true. When enabled, access control checks are done by the JobTracker
-          and the TaskTracker before allowing users to view
-          job details or to modify a job using MapReduce APIs,
-          CLI or web user interfaces.</p>
+          <p>Job level authorization and queue level authorization are enabled
+          on the cluster, if the configuration
+          <code>mapred.acls.enabled</code> is set to
+          true. When enabled, access control checks are done by (a) the
+          JobTracker before allowing users to submit jobs to queues and
+          administering these jobs and (b) by the JobTracker and the TaskTracker 
+          before allowing users to view job details or to modify a job using
+          MapReduce APIs, CLI or web user interfaces.</p>
           
           <p>A job submitter can specify access control lists for viewing or
           modifying a job via the configuration properties
@@ -1547,10 +1549,11 @@
             <li> killing/failing a task of a job </li>
             <li> setting the priority of a job </li>
           </ul>
-          <p>These operations are also protected by the queue level ACL,
-          "acl-administer-jobs", configured via mapred-queue-acls.xml. The caller
-          will be authorized against both queue level ACLs and job level ACLs,
-          depending on what is enabled.</p>
+          <p>These operations are also permitted by the queue level ACL,
+          "mapred.queue.queue-name.acl-administer-jobs", configured via
+          mapred-queue-acls.xml. The caller will be able to do the operation
+          if he/she is part of either queue admins ACL or job modification ACL.
+          </p>
           
           <p>The format of a job level ACL is the same as the format for a
           queue level ACL as defined in the

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar  4 04:13:26 2011
@@ -900,8 +900,14 @@
 <property>
   <name>mapred.acls.enabled</name>
   <value>false</value>
-  <description> Specifies whether ACLs are enabled, and should be checked
-    for various operations.
+  <description> Specifies whether ACLs should be checked
+    for authorization of users for doing various queue and job level operations.
+    ACLs are disabled by default. If enabled, access control checks are made by
+    JobTracker and TaskTracker when requests are made by users for queue
+    operations like submit job to a queue and kill a job in the queue and job
+    operations like viewing the job-details (See mapreduce.job.acl-view-job)
+    or for modifying the job (See mapreduce.job.acl-modify-job) using
+    Map/Reduce APIs, RPCs or via the console and web user interfaces.
   </description>
 </property>
 
@@ -917,68 +923,62 @@
 </property>
 
 <property>
-  <name>mapreduce.cluster.job-authorization-enabled</name>
-  <value>false</value>
-  <description> Boolean flag that specifies if job-level authorization checks
-  should be enabled on the jobs submitted to the cluster.  Job-level
-  authorization is enabled if this flag is set to true or disabled otherwise.
-  It is disabled by default. If enabled, access control checks are made by
-  JobTracker and TaskTracker when requests are made by users for viewing the
-  job-details (See mapreduce.job.acl-view-job) or for modifying the job
-  (See mapreduce.job.acl-modify-job) using Map/Reduce APIs, RPCs or via the
-  console and web user interfaces.
-  </description>
-</property>
-
-<property>
   <name>mapreduce.job.acl-modify-job</name>
-  <value></value>
+  <value> </value>
   <description> Job specific access-control list for 'modifying' the job. It
     is only used if authorization is enabled in Map/Reduce by setting the
-    configuration property mapreduce.cluster.job-authorization-enabled to true.
+    configuration property mapred.acls.enabled to true.
     This specifies the list of users and/or groups who can do modification
     operations on the job. For specifying a list of users and groups the
     format to use is "user1,user2 group1,group". If set to '*', it allows all
-    users/groups to modify this job. If set to '', it allows none. This
-    configuration is used to guard all the modifications with respect to this
-    job and takes care of all the following operations:
+    users/groups to modify this job. If set to ' '(i.e. space), it allows
+    none. This configuration is used to guard all the modifications with respect
+    to this job and takes care of all the following operations:
       o killing this job
       o killing a task of this job, failing a task of this job
       o setting the priority of this job
     Each of these operations are also protected by the per-queue level ACL
     "acl-administer-jobs" configured via mapred-queues.xml. So a caller should
-    have the authorization to satisfy both the queue-level ACL and the
+    have the authorization to satisfy either the queue-level ACL or the
     job-level ACL.
 
-    Irrespective of this ACL configuration, job-owner, superuser and members
-    of supergroup configured on JobTracker via mapred.permissions.supergroup,
+    Irrespective of this ACL configuration, job-owner, superuser, members
+    of supergroup configured on JobTracker via mapred.permissions.supergroup
+    and administrators of the queue to which this job is submitted to 
     can do all the modification operations.
 
-    By default, nobody else besides job-owner, superuser/supergroup can
-    perform modification operations on a job that they don't own.
+    By default, nobody else besides job-owner, superuser, members of supergroup
+    and queue administrators can perform modification operations on a job.
   </description>
 </property>
 
 <property>
   <name>mapreduce.job.acl-view-job</name>
-  <value></value>
+  <value> </value>
   <description> Job specific access-control list for 'viewing' the job. It is
     only used if authorization is enabled in Map/Reduce by setting the
-    configuration property mapreduce.cluster.job-authorization-enabled to true.
+    configuration property mapred.acls.enabled to true.
     This specifies the list of users and/or groups who can view private details
     about the job. For specifying a list of users and groups the
     format to use is "user1,user2 group1,group". If set to '*', it allows all
-    users/groups to modify this job. If set to '', it allows none. This
-    configuration is used to guard some of the job-views and at present only
-    protects APIs that can return possibly sensitive information of the
-    job-owner like
+    users/groups to modify this job. If set to ' '(i.e. space), it allows
+    none. This configuration is used to guard some of the job-views and at
+    present only protects APIs that can return possibly sensitive information
+    of the job-owner like
       o job-level counters
       o task-level counters
       o tasks' diagnostic information
       o task-logs displayed on the TaskTracker web-UI and
       o job.xml showed by the JobTracker's web-UI
-    Every other piece information of jobs is still accessible by any other
-    users, for e.g., JobStatus, JobProfile, list of jobs in the queue, etc.
+    Every other piece of information of jobs is still accessible by any other
+    user, for e.g., JobStatus, JobProfile, list of jobs in the queue, etc.
+    
+    Irrespective of this ACL configuration, job-owner, the user who started the
+    cluster, members of supergroup configured on JobTracker via
+    mapred.permissions.supergroup can do all the view operations.
+    
+    By default, nobody else besides job-owner, superuser, members of supergroup
+    can perform view operations on a job.
   </description>
 </property>
 

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ACLsManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ACLsManager.java?rev=1077423&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ACLsManager.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ACLsManager.java Fri Mar  4 04:13:26 2011
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.AuditLogger.Constants;
+import org.apache.hadoop.mapred.QueueManager.QueueOperation;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
+/**
+ * Manages MapReduce cluster administrators and access checks for
+ * job level operations and queue level operations.
+ * Uses JobACLsManager for access checks of job level operations and
+ * QueueManager for queue operations.
+ */
+class ACLsManager {
+
+  // MROwner(user who started this mapreduce cluster)'s ugi
+  private final UserGroupInformation mrOwner;
+  // members of supergroup are mapreduce cluster administrators
+  private final String superGroup;
+  
+  private final JobACLsManager jobACLsManager;
+  private final QueueManager queueManager;
+  
+  private final boolean aclsEnabled;
+
+  ACLsManager(Configuration conf, JobACLsManager jobACLsManager,
+      QueueManager queueManager) throws IOException {
+
+    if (UserGroupInformation.isLoginKeytabBased()) {
+      mrOwner = UserGroupInformation.getLoginUser();
+    } else {
+      mrOwner = UserGroupInformation.getCurrentUser();
+    }
+
+    superGroup = conf.get(JobConf.MR_SUPERGROUP, "supergroup");
+    
+    aclsEnabled = conf.getBoolean(JobConf.MR_ACLS_ENABLED, false);
+
+    this.jobACLsManager = jobACLsManager;
+
+    this.queueManager = queueManager;
+  }
+
+  UserGroupInformation getMROwner() {
+    return mrOwner;
+  }
+
+  String getSuperGroup() {
+    return superGroup;
+  }
+
+  JobACLsManager getJobACLsManager() {
+    return jobACLsManager;
+  }
+
+  /**
+   * Is the calling user an admin for the mapreduce cluster ?
+   * i.e. either cluster owner or member of mapred.permissions.supergroup.
+   * @return true, if user is an admin
+   */
+  boolean isMRAdmin(UserGroupInformation callerUGI) {
+    if (mrOwner.getShortUserName().equals(callerUGI.getShortUserName())) {
+      return true;
+    }
+    String[] groups = callerUGI.getGroupNames();
+    for(int i=0; i < groups.length; ++i) {
+      if (groups[i].equals(superGroup)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Check the ACLs for a user doing the passed queue-operation and the passed
+   * job operation.
+   * <ul>
+   * <li>If ACLs are disabled, allow all users.</li>
+   * <li>If the operation is not a job operation(for eg. submit-job-to-queue),
+   *  then allow only (a) clusterOwner(who started the cluster), (b) members of
+   *  supergroup and (c) members of queue admins acl for the queue.</li>
+   * <li>If the operation is a job operation, then allow only (a) jobOwner,
+   * (b) clusterOwner(who started the cluster), (c) members of supergroup,
+   * (d) members of queue admins acl for the queue and (e) members of job
+   * acl for the jobOperation</li>
+   * </ul>
+   * 
+   * @param job
+   * @param callerUGI
+   * @param oper
+   * @param jobOperation
+   * @throws AccessControlException
+   * @throws IOException
+   */
+  void checkAccess(JobInProgress job,
+      UserGroupInformation callerUGI, QueueOperation qOperation,
+      JobACL jobOperation) throws AccessControlException {
+
+    String queue = job.getProfile().getQueueName();
+    String jobId = job.getJobID().toString();
+    JobStatus jobStatus = job.getStatus();
+    String jobOwner = jobStatus.getUsername();
+    AccessControlList jobAcl = jobStatus.getJobACLs().get(jobOperation);
+
+    checkAccess(jobId, callerUGI, queue, qOperation,
+        jobOperation, jobOwner, jobAcl);
+  }
+
+  /**
+   * Check the ACLs for a user doing the passed job operation.
+   * <ul>
+   * <li>If ACLs are disabled, allow all users.</li>
+   * <li>Otherwise, allow only (a) jobOwner,
+   * (b) clusterOwner(who started the cluster), (c) members of supergroup,
+   * (d) members of job acl for the jobOperation</li>
+   * </ul>
+   */
+  void checkAccess(JobStatus jobStatus, UserGroupInformation callerUGI,
+      JobACL jobOperation) throws AccessControlException {
+
+    String jobId = jobStatus.getJobID().toString();
+    String jobOwner = jobStatus.getUsername();
+    AccessControlList jobAcl = jobStatus.getJobACLs().get(jobOperation);
+
+    // If acls are enabled, check if jobOwner, cluster admin or part of job ACL
+    checkAccess(jobId, callerUGI, jobOperation, jobOwner, jobAcl);
+  }
+
+  /**
+   * Check the ACLs for a user doing the passed job operation.
+   * <ul>
+   * <li>If ACLs are disabled, allow all users.</li>
+   * <li>Otherwise, allow only (a) jobOwner,
+   * (b) clusterOwner(who started the cluster), (c) members of supergroup,
+   * (d) members of job acl for the jobOperation</li>
+   * </ul>
+   */
+  void checkAccess(String jobId, UserGroupInformation callerUGI,
+      JobACL jobOperation, String jobOwner, AccessControlList jobAcl)
+      throws AccessControlException {
+    // TODO: Queue admins are to be allowed to do the job view operation.
+    checkAccess(jobId, callerUGI, null, null, jobOperation, jobOwner, jobAcl);
+  }
+
+  /**
+   * Check the ACLs for a user doing the passed queue-operation and the passed
+   * job operation.
+   * <ul>
+   * <li>If ACLs are disabled, allow all users.</li>
+   * <li>If the operation is not a job operation(for eg. submit-job-to-queue),
+   *  then allow only (a) clusterOwner(who started the cluster), (b) members of
+   *  supergroup and (c) members of queue admins acl for the queue.</li>
+   * <li>If the operation is a job operation, then allow only (a) jobOwner,
+   * (b) clusterOwner(who started the cluster), (c) members of supergroup,
+   * (d) members of queue admins acl for the queue and (e) members of job
+   * acl for the jobOperation</li>
+   * </ul>
+   * 
+   * callerUGI user who is trying to perform the qOperation/jobOperation.
+   * jobAcl could be job-view-acl or job-modify-acl depending on jobOperation.
+   */
+  void checkAccess(String jobId, UserGroupInformation callerUGI,
+      String queue, QueueOperation qOperation,
+      JobACL jobOperation, String jobOwner, AccessControlList jobAcl)
+      throws AccessControlException {
+    if (!aclsEnabled) {
+      return;
+    }
+
+    String user = callerUGI.getShortUserName();
+
+    // Allow mapreduce cluster admins to do any queue operation and
+    // any job operation
+    if (isMRAdmin(callerUGI)) {
+      if (qOperation == QueueOperation.SUBMIT_JOB) {
+        AuditLogger.logSuccess(user, qOperation.name(), queue);
+      } else {
+        AuditLogger.logSuccess(user, jobOperation.name(), jobId);
+      }
+      return;
+    }
+
+    if (qOperation == QueueOperation.SUBMIT_JOB) {
+      // This is strictly queue operation(not a job operation) like
+      // submit-job-to-queue.
+      if (!queueManager.hasAccess(queue, qOperation, callerUGI)) {
+        AuditLogger.logFailure(user, qOperation.name(), null, queue,
+            Constants.UNAUTHORIZED_USER + ", job : " + jobId);
+
+        throw new AccessControlException("User "
+            + callerUGI.getShortUserName() + " cannot perform "
+            + "operation " + qOperation + " on queue " + queue
+            + ".\n Please run \"hadoop queue -showacls\" "
+            + "command to find the queues you have access to .");
+      } else {
+        AuditLogger.logSuccess(user, qOperation.name(), queue);
+        return;
+      }
+    }
+
+    if (jobOperation == JobACL.VIEW_JOB) {
+      // check if jobOwner or part of acl-view-job
+      if (jobACLsManager.checkAccess(callerUGI, jobOperation,
+          jobOwner, jobAcl)) {
+        AuditLogger.logSuccess(user, jobOperation.name(), jobId.toString());
+        return;
+      }
+      else {
+        AuditLogger.logFailure(user, jobOperation.name(), null,
+            jobId.toString(), Constants.UNAUTHORIZED_USER);
+        throw new AccessControlException("User "
+            + callerUGI.getShortUserName() + " cannot perform operation "
+            + jobOperation + " on " + jobId);
+      }
+    }
+
+    if (jobOperation == JobACL.MODIFY_JOB) {
+      // check if queueAdmin, jobOwner or part of acl-modify-job
+      if (queueManager.hasAccess(queue, qOperation, callerUGI)) {
+        AuditLogger.logSuccess(user, qOperation.name(), queue);
+        return;
+      } else if (jobACLsManager.checkAccess(callerUGI, jobOperation,
+                 jobOwner, jobAcl)) {
+        AuditLogger.logSuccess(user, jobOperation.name(), jobId);
+        return;
+      }
+      AuditLogger.logFailure(user, jobOperation.name(), null,
+          jobId.toString(), Constants.UNAUTHORIZED_USER + ", queue : "
+          + queue);
+
+      throw new AccessControlException("User "
+          + callerUGI.getShortUserName() + " cannot perform operation "
+          + jobOperation + " on " + jobId + " that is in the queue "
+          + queue);
+    }
+
+    throw new AccessControlException("Unsupported queue operation "
+        + qOperation + " on queue " + queue + ", job operation "
+        + jobOperation + " on job " + jobId);
+  }
+
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CompletedJobStatusStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CompletedJobStatusStore.java?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CompletedJobStatusStore.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/CompletedJobStatusStore.java Fri Mar  4 04:13:26 2011
@@ -49,7 +49,7 @@ class CompletedJobStatusStore implements
   private FileSystem fs;
   private static final String JOB_INFO_STORE_DIR = "/jobtracker/jobsInfo";
 
-  private JobACLsManager jobACLsManager = null;
+  private ACLsManager aclsManager;
 
   public static final Log LOG =
           LogFactory.getLog(CompletedJobStatusStore.class);
@@ -57,7 +57,8 @@ class CompletedJobStatusStore implements
   private static long HOUR = 1000 * 60 * 60;
   private static long SLEEP_TIME = 1 * HOUR;
 
-  CompletedJobStatusStore(JobACLsManager aclsManager, Configuration conf)
+
+  CompletedJobStatusStore(Configuration conf, ACLsManager aclsManager)
       throws IOException {
     active =
       conf.getBoolean("mapred.job.tracker.persist.jobstatus.active", false);
@@ -87,7 +88,7 @@ class CompletedJobStatusStore implements
         deleteJobStatusDirs();
       }
 
-      this.jobACLsManager = aclsManager;
+      this.aclsManager = aclsManager;
 
       LOG.info("Completed job store activated/configured with retain-time : " 
                + retainTime + " , job-info-dir : " + jobInfoDir);
@@ -285,7 +286,7 @@ class CompletedJobStatusStore implements
   }
 
   /**
-   * This method retrieves Counters information from DFS stored using
+   * This method retrieves Counters information from file stored using
    * store method.
    *
    * @param jobId the jobId for which Counters is queried
@@ -299,7 +300,8 @@ class CompletedJobStatusStore implements
         FSDataInputStream dataIn = getJobInfoFile(jobId);
         if (dataIn != null) {
           JobStatus jobStatus = readJobStatus(dataIn);
-          jobACLsManager.checkAccess(jobStatus,
+          // authorize the user for job view access
+          aclsManager.checkAccess(jobStatus,
               UserGroupInformation.getCurrentUser(), JobACL.VIEW_JOB);
           readJobProfile(dataIn);
           counters = readCounters(dataIn);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JSPUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JSPUtil.java?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JSPUtil.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JSPUtil.java Fri Mar  4 04:13:26 2011
@@ -95,14 +95,14 @@ class JSPUtil {
    *         and decide if view should be allowed or not. Job will be null if
    *         the job with given jobid doesnot exist at the JobTracker.
    */
-  public static JobWithViewAccessCheck checkAccessAndGetJob(JobTracker jt,
+  public static JobWithViewAccessCheck checkAccessAndGetJob(final JobTracker jt,
       JobID jobid, HttpServletRequest request, HttpServletResponse response)
       throws ServletException, IOException {
     final JobInProgress job = jt.getJob(jobid);
     JobWithViewAccessCheck myJob = new JobWithViewAccessCheck(job);
 
     String user = request.getRemoteUser();
-    if (user != null && job != null && jt.isJobLevelAuthorizationEnabled()) {
+    if (user != null && job != null && jt.areACLsEnabled()) {
       final UserGroupInformation ugi =
         UserGroupInformation.createRemoteUser(user);
       try {
@@ -110,7 +110,7 @@ class JSPUtil {
           public Void run() throws IOException, ServletException {
 
             // checks job view permission
-            job.checkAccess(ugi, JobACL.VIEW_JOB);
+            jt.getACLsManager().checkAccess(job, ugi, null, JobACL.VIEW_JOB);
             return null;
           }
         });
@@ -486,9 +486,11 @@ class JSPUtil {
     } else {
       currentUser = UserGroupInformation.createRemoteUser(user);
     }
-    jobTracker.getJobACLsManager().checkAccess(JobID.forName(jobid),
-        currentUser, JobACL.VIEW_JOB,
+
+    // Authorize the user for view access of this job
+    jobTracker.getACLsManager().checkAccess(jobid, currentUser, JobACL.VIEW_JOB,
         jobInfo.get(Keys.USER), jobInfo.getJobACLs().get(JobACL.VIEW_JOB));
+
     return jobInfo;
   }
 
@@ -559,7 +561,7 @@ class JSPUtil {
   static void printJobACLs(JobTracker tracker,
       Map<JobACL, AccessControlList> jobAcls, JspWriter out)
       throws IOException {
-    if (tracker.isJobLevelAuthorizationEnabled()) {
+    if (tracker.areACLsEnabled()) {
       // Display job-view-acls and job-modify-acls configured for this job
       out.print("<b>Job-ACLs:</b><br>");
       for (JobACL aclName : JobACL.values()) {
@@ -572,6 +574,10 @@ class JSPUtil {
         }
       }
     }
+    else {
+      out.print("<b>Job-ACLs: " + new AccessControlList("*").toString()
+          + "</b><br>");
+    }
   }
 
   static boolean privateActionsAllowed(JobConf conf) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobACLsManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobACLsManager.java?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobACLsManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobACLsManager.java Fri Mar  4 04:13:26 2011
@@ -20,8 +20,6 @@ package org.apache.hadoop.mapred;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapred.AuditLogger;
 import org.apache.hadoop.mapred.AuditLogger.Constants;
@@ -29,14 +27,17 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 
-public abstract class JobACLsManager {
+class JobACLsManager {
 
-	  static final Log LOG = LogFactory.getLog(JobACLsManager.class);
+	  JobConf conf;
+	  
+	  public JobACLsManager(JobConf conf) {
+        this.conf = conf;
+      }
 
-	  protected abstract boolean isJobLevelAuthorizationEnabled();
-
-	  protected abstract boolean isSuperUserOrSuperGroup(
-	      UserGroupInformation callerUGI);
+	  boolean areACLsEnabled() {
+	    return conf.getBoolean(JobConf.MR_ACLS_ENABLED, false);
+	  }
 
 	  /**
 	   * Construct the jobACLs from the configuration so that they can be kept in
@@ -51,7 +52,7 @@ public abstract class JobACLsManager {
 	      new HashMap<JobACL, AccessControlList>();
 
 	    // Don't construct anything if authorization is disabled.
-	    if (!isJobLevelAuthorizationEnabled()) {
+	    if (!areACLsEnabled()) {
 	      return acls;
 	    }
 
@@ -69,70 +70,35 @@ public abstract class JobACLsManager {
 	  }
 
 	  /**
-	   * If authorization is enabled, checks whether the user (in the callerUGI) is
-	   * authorized to perform the operation specified by 'jobOperation' on the job.
-	   * <ul>
-	   * <li>The owner of the job can do any operation on the job</li>
-	   * <li>The superuser/supergroup is always permitted to do operations on any
-	   * job.</li>
-	   * <li>For all other users/groups job-acls are checked</li>
-	   * </ul>
-	   * 
-	   * @param jobStatus
-	   * @param callerUGI
-	   * @param jobOperation
-	   */
-	  void checkAccess(JobStatus jobStatus, UserGroupInformation callerUGI,
-	      JobACL jobOperation) throws AccessControlException {
-
-	    JobID jobId = jobStatus.getJobID();
-	    String jobOwner = jobStatus.getUsername();
-	    AccessControlList acl = jobStatus.getJobACLs().get(jobOperation);
-	    checkAccess(jobId, callerUGI, jobOperation, jobOwner, acl);
-	  }
-
-	  /**
-	   * If authorization is enabled, checks whether the user (in the callerUGI) is
-	   * authorized to perform the operation specified by 'jobOperation' on the job.
+	   * If authorization is enabled, checks whether the user (in the callerUGI)
+	   * is authorized to perform the operation specified by 'jobOperation' on
+	   * the job by checking if the user is jobOwner or part of job ACL for the
+	   * specific job operation.
 	   * <ul>
 	   * <li>The owner of the job can do any operation on the job</li>
-	   * <li>The superuser/supergroup is always permitted to do operations on any
-	   * job.</li>
 	   * <li>For all other users/groups job-acls are checked</li>
 	   * </ul>
-	   * @param jobId
 	   * @param callerUGI
 	   * @param jobOperation
 	   * @param jobOwner
 	   * @param jobACL
 	   * @throws AccessControlException
 	   */
-	  void checkAccess(JobID jobId, UserGroupInformation callerUGI,
+	  boolean checkAccess(UserGroupInformation callerUGI,
 	      JobACL jobOperation, String jobOwner, AccessControlList jobACL)
 	      throws AccessControlException {
 
 	    String user = callerUGI.getShortUserName();
-	    if (!isJobLevelAuthorizationEnabled()) {
-	      return;
+	    if (!areACLsEnabled()) {
+	      return true;
 	    }
 
-	    // Allow superusers/supergroups
-	    // Allow Job-owner as the job's owner is always part of all the ACLs
-	    if (callerUGI.getShortUserName().equals(jobOwner)
-	        || isSuperUserOrSuperGroup(callerUGI) 
+	    // Allow Job-owner for any operation on the job
+	    if (user.equals(jobOwner) 
 	        || jobACL.isUserAllowed(callerUGI)) {
-	      AuditLogger.logSuccess(user, jobOperation.name(),  jobId.toString());
-	      return;
+	      return true;
 	    }
 
-	    // log this event to the audit log
-	    AuditLogger.logFailure(user, jobOperation.name(), jobACL.toString(), 
-	                           jobId.toString(), Constants.UNAUTHORIZED_USER);
-	    throw new AccessControlException(callerUGI
-	        + " is not authorized for performing the operation "
-	        + jobOperation.toString() + " on " + jobId + ". "
-	        + jobOperation.toString()
-	        + " Access control list configured for this job : "
-	        + jobACL.toString());
+	    return false;
 	  }
 	}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java Fri Mar  4 04:13:26 2011
@@ -164,8 +164,8 @@ public class JobConf extends Configurati
   static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
       "mapred.job.reduce.memory.mb";
 
-  public static final String JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG = 
-	    "mapreduce.cluster.job-authorization-enabled";
+  static final String MR_ACLS_ENABLED = "mapred.acls.enabled";
+
   static final String MR_SUPERGROUP = "mapred.permissions.supergroup";
 
   /**

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java Fri Mar  4 04:13:26 2011
@@ -20,7 +20,6 @@ package org.apache.hadoop.mapred;
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileFilter;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -55,7 +54,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.JobACL;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.util.StringUtils;
 
@@ -117,6 +115,7 @@ public class JobHistory {
   private static FileSystem DONEDIR_FS; // Done dir filesystem
   private static JobConf jtConf;
   private static Path DONE = null; // folder for completed jobs
+  private static boolean aclsEnabled = false;
   /**
    * A filter for conf files
    */  
@@ -346,6 +345,9 @@ public class JobHistory {
                      3 * 1024 * 1024);
       jtConf = conf;
 
+      // queue and job level security is enabled on the mapreduce cluster or not
+      aclsEnabled = conf.getBoolean(JobConf.MR_ACLS_ENABLED, false);
+
       // initialize the file manager
       fileManager = new JobHistoryFilesManager(conf, jobTracker);
     } catch(IOException e) {
@@ -1252,14 +1254,19 @@ public class JobHistory {
           // Log the history meta info
           JobHistory.MetaInfoManager.logMetaInfo(writers);
 
+          String viewJobACL = "*";
+          String modifyJobACL = "*";
+          if (aclsEnabled) {
+            viewJobACL = jobConf.get(JobACL.VIEW_JOB.getAclName(), " ");
+            modifyJobACL = jobConf.get(JobACL.MODIFY_JOB.getAclName(), " ");
+          }
           //add to writer as well 
           JobHistory.log(writers, RecordTypes.Job, 
                          new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF, 
                                       Keys.VIEW_JOB, Keys.MODIFY_JOB }, 
                          new String[]{jobId.toString(), jobName, user, 
                                       String.valueOf(submitTime) , jobConfPath,
-                                      jobConf.get(JobACL.VIEW_JOB.getAclName(), ""),
-                                      jobConf.get(JobACL.MODIFY_JOB.getAclName(), "")}
+                                      viewJobACL, modifyJobACL}
                         ); 
              
         }catch(IOException e){

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 04:13:26 2011
@@ -36,7 +36,6 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -44,7 +43,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.AuditLogger;
 import org.apache.hadoop.mapred.JobHistory.Values;
-import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -62,7 +60,6 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -707,25 +704,6 @@ public class JobInProgress {
     return allTaskSplitMetaInfo;
   }
 
-  /**
-   * If authorization is enabled on the JobTracker, checks whether the user (in
-   * the callerUGI) is authorized to perform the operation specify by
-   * 'jobOperation' on the job.
-   * <ul>
-   * <li>The owner of the job can do any operation on the job</li>
-   * <li>The superuser/supergroup of the JobTracker is always permitted to do
-   * operations on any job.</li>
-   * <li>For all other users/groups job-acls are checked</li>
-   * </ul>
-   * 
-   * @param callerUGI
-   * @param jobOperation
-   */
-  void checkAccess(UserGroupInformation callerUGI, JobACL jobOperation)
-      throws AccessControlException {
-    jobtracker.getJobACLsManager().checkAccess(status, callerUGI, jobOperation);
-  }
-
   /////////////////////////////////////////////////////
   // Accessors for the JobInProgress
   /////////////////////////////////////////////////////

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 04:13:26 2011
@@ -19,18 +19,14 @@ package org.apache.hadoop.mapred;
 
 
 import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
 import java.io.Writer;
 import java.net.BindException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.security.PrivilegedExceptionAction;
@@ -74,7 +70,6 @@ import org.apache.hadoop.mapreduce.secur
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
 import org.apache.hadoop.mapred.AuditLogger.Constants;
@@ -96,6 +91,7 @@ import org.apache.hadoop.security.Refres
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
@@ -1697,8 +1693,8 @@ public class JobTracker implements MRCon
 
           // check the access
           try {
-            checkAccess(job, ugi, QueueManager.QueueOperation.SUBMIT_JOB,
-                        null);
+            aclsManager.checkAccess(job, ugi,
+                QueueManager.QueueOperation.SUBMIT_JOB, null);
           } catch (Throwable t) {
             LOG.warn("Access denied for user " + ugi.getShortUserName() 
                      + " in groups : [" 
@@ -1942,7 +1938,6 @@ public class JobTracker implements MRCon
                                                 "expireLaunchingTasks");
 
   CompletedJobStatusStore completedJobStatusStore = null;
-  private JobTrackerJobACLsManager jobACLsManager;
   Thread completedJobsStoreThread = null;
   RecoveryManager recoveryManager;
 
@@ -1982,8 +1977,8 @@ public class JobTracker implements MRCon
   FileSystem fs = null;
   Path systemDir = null;
   JobConf conf;
-  private final UserGroupInformation mrOwner;
-  private final String supergroup;
+
+  private final ACLsManager aclsManager;
 
   long limitMaxMemForMapTasks;
   long limitMaxMemForReduceTasks;
@@ -2024,16 +2019,7 @@ public class JobTracker implements MRCon
     // get the desired principal to load
     UserGroupInformation.setConfiguration(conf);
     SecurityUtil.login(conf, JT_KEYTAB_FILE, JT_USER_NAME, localMachine);
-    if (UserGroupInformation.isLoginKeytabBased()) {
-      mrOwner = UserGroupInformation.getLoginUser();
-    } else {
-      mrOwner = UserGroupInformation.getCurrentUser();
-    }
-  
-    supergroup = conf.get(JobConf.MR_SUPERGROUP,
-                          "supergroup");
-    LOG.info("Starting jobtracker with owner as " + mrOwner.getShortUserName() 
-             + " and supergroup as " + supergroup);
+
     long secretKeyInterval = 
     conf.getLong(DELEGATION_KEY_UPDATE_INTERVAL_KEY, 
                    DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
@@ -2096,7 +2082,13 @@ public class JobTracker implements MRCon
 
     Configuration queuesConf = new Configuration(this.conf);
     queueManager = new QueueManager(queuesConf);
-    
+
+    aclsManager = new ACLsManager(conf, new JobACLsManager(conf), queueManager);
+
+    LOG.info("Starting jobtracker with owner as " +
+        getMROwner().getShortUserName() + " and supergroup as " +
+        getSuperGroup());
+
     // Create the scheduler
     Class<? extends TaskScheduler> schedulerClass
       = conf.getClass("mapred.jobtracker.taskScheduler",
@@ -2136,7 +2128,7 @@ public class JobTracker implements MRCon
     // initialize history parameters.
     final JobTracker jtFinal = this;
     boolean historyInitialized = 
-      mrOwner.doAs(new PrivilegedExceptionAction<Boolean>() {
+      getMROwner().doAs(new PrivilegedExceptionAction<Boolean>() {
         @Override
         public Boolean run() throws Exception {
           return JobHistory.init(jtFinal, conf,jtFinal.localMachine, 
@@ -2182,7 +2174,7 @@ public class JobTracker implements MRCon
       try {
         // if we haven't contacted the namenode go ahead and do it
         if (fs == null) {
-          fs = mrOwner.doAs(new PrivilegedExceptionAction<FileSystem>() {
+          fs = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() {
             public FileSystem run() throws IOException {
               return FileSystem.get(conf);
           }});
@@ -2194,9 +2186,10 @@ public class JobTracker implements MRCon
         }
         try {
           FileStatus systemDirStatus = fs.getFileStatus(systemDir);
-          if (!systemDirStatus.getOwner().equals(mrOwner.getShortUserName())) {
+          if (!systemDirStatus.getOwner().equals(
+              getMROwner().getShortUserName())) {
             throw new AccessControlException("The systemdir " + systemDir +
-                " is not owned by " + mrOwner.getShortUserName());
+                " is not owned by " + getMROwner().getShortUserName());
           }
           if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) {
             LOG.warn("Incorrect permissions on " + systemDir +
@@ -2257,7 +2250,8 @@ public class JobTracker implements MRCon
 
     // Initialize history DONE folder
     if (historyInitialized) {
-      FileSystem historyFS = mrOwner.doAs(new PrivilegedExceptionAction<FileSystem>() {
+      FileSystem historyFS = getMROwner().doAs(
+          new PrivilegedExceptionAction<FileSystem>() {
         public FileSystem run() throws IOException {
           JobHistory.initDone(conf, fs);
           final String historyLogDir = 
@@ -2276,10 +2270,8 @@ public class JobTracker implements MRCon
     this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
-    // Initialize the jobACLSManager
-    jobACLsManager = new JobTrackerJobACLsManager(this);
     //initializes the job status store
-    completedJobStatusStore = new CompletedJobStatusStore(jobACLsManager, conf);
+    completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
   }
 
   private static SimpleDateFormat getDateFormat() {
@@ -3684,7 +3676,8 @@ public class JobTracker implements MRCon
 
       // check for access
       try {
-        checkAccess(job, ugi, QueueManager.QueueOperation.SUBMIT_JOB, null);
+        aclsManager.checkAccess(job, ugi,
+            QueueManager.QueueOperation.SUBMIT_JOB, null);
       } catch (IOException ioe) {
         LOG.warn("Access denied for user " + job.getJobConf().getUser()
             + ". Ignoring job " + jobId, ioe);
@@ -3723,7 +3716,7 @@ public class JobTracker implements MRCon
     try{
       final String user =
         UserGroupInformation.getCurrentUser().getShortUserName();
-      return mrOwner.doAs(new PrivilegedExceptionAction<String>() {
+      return getMROwner().doAs(new PrivilegedExceptionAction<String>() {
         @Override
         public String run() throws Exception {
           return getStagingAreaDirInternal(user);
@@ -3774,54 +3767,12 @@ public class JobTracker implements MRCon
   }
 
   /**
-   * Is job-level authorization enabled on the JT?
+   * Are ACLs for authorization checks enabled on the JT?
    * 
    * @return
    */
-  boolean isJobLevelAuthorizationEnabled() {
-    return conf.getBoolean(JobConf.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, false);
-  }
-
-  /**
-   * Check the ACLs for a user doing the passed queue-operation and the passed
-   * job operation.
-   * <ul>
-   * <li>Superuser/supergroup can do any operation on the job</li>
-   * <li>For any other user/group, the configured ACLs for the corresponding
-   * queue and the job are checked.</li>
-   * </ul>
-   * 
-   * @param job
-   * @param callerUGI
-   * @param oper
-   * @param jobOperation
-   * @throws AccessControlException
-   * @throws IOException
-   */
-  private void checkAccess(JobInProgress job,
-      UserGroupInformation callerUGI, QueueManager.QueueOperation oper,
-      JobACL jobOperation) throws AccessControlException {
-
-    // get the queue and verify the queue access
-    String queue = job.getProfile().getQueueName();
-    if (!queueManager.hasAccess(queue, job, oper, callerUGI)) {
-      throw new AccessControlException("User " 
-                            + callerUGI.getShortUserName() 
-                            + " cannot perform "
-                            + "operation " + oper + " on queue " + queue +
-                            ".\n Please run \"hadoop queue -showacls\" " +
-                            "command to find the queues you have access" +
-                            " to .");
-    }
-
-    // check nulls, for e.g., submitJob RPC doesn't have a jobOperation as the
-    // job itself isn't created by that time.
-    if (jobOperation == null) {
-      return;
-    }
-
-    // check the access to the job
-    job.checkAccess(callerUGI, jobOperation);
+  boolean areACLsEnabled() {
+    return conf.getBoolean(JobConf.MR_ACLS_ENABLED, false);
   }
 
   /**@deprecated use {@link #getClusterStatus(boolean)}*/
@@ -3884,7 +3835,7 @@ public class JobTracker implements MRCon
     }
         
     // check both queue-level and job-level access
-    checkAccess(job, UserGroupInformation.getCurrentUser(),
+    aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
         QueueManager.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
 
     killJob(job);
@@ -4085,17 +4036,18 @@ public class JobTracker implements MRCon
   
   private static final Counters EMPTY_COUNTERS = new Counters();
   public Counters getJobCounters(JobID jobid) throws IOException {
+    UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
     synchronized (this) {
       JobInProgress job = jobs.get(jobid);
       if (job != null) {
 
         // check the job-access
-        job.checkAccess(UserGroupInformation.getCurrentUser(),
-            JobACL.VIEW_JOB);
+        aclsManager.checkAccess(job, callerUGI, null, JobACL.VIEW_JOB);
 
         return isJobInited(job) ? job.getCounters() : EMPTY_COUNTERS;
       } 
     }
+
     return completedJobStatusStore.readCounters(jobid);
   }
   
@@ -4106,7 +4058,7 @@ public class JobTracker implements MRCon
     JobInProgress job = jobs.get(jobid);
     if (job != null) {
       // Check authorization
-      job.checkAccess(UserGroupInformation.getCurrentUser(),
+      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(), null,
           JobACL.VIEW_JOB);
     }
     if (job == null || !isJobInited(job)) {
@@ -4134,7 +4086,7 @@ public class JobTracker implements MRCon
     JobInProgress job = jobs.get(jobid);
     if (job != null) {
       // Check authorization
-      job.checkAccess(UserGroupInformation.getCurrentUser(),
+      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(), null,
           JobACL.VIEW_JOB);
     }
     if (job == null || !isJobInited(job)) {
@@ -4160,7 +4112,7 @@ public class JobTracker implements MRCon
     JobInProgress job = jobs.get(jobid);
     if (job != null) {
       // Check authorization
-      job.checkAccess(UserGroupInformation.getCurrentUser(),
+      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(), null,
           JobACL.VIEW_JOB);
     }
     if (job == null || !isJobInited(job)) {
@@ -4189,7 +4141,7 @@ public class JobTracker implements MRCon
     JobInProgress job = jobs.get(jobid);
     if (job != null) {
       // Check authorization
-      job.checkAccess(UserGroupInformation.getCurrentUser(),
+      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(), null,
           JobACL.VIEW_JOB);
     }
     if (job == null || !isJobInited(job)) {
@@ -4256,7 +4208,7 @@ public class JobTracker implements MRCon
     JobInProgress job = jobs.get(jobId);
     if (job != null) {
       // Check authorization
-      job.checkAccess(UserGroupInformation.getCurrentUser(),
+      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(), null,
           JobACL.VIEW_JOB);
     }
     if (job != null && isJobInited(job)) {
@@ -4316,7 +4268,8 @@ public class JobTracker implements MRCon
     TaskInProgress tip = taskidToTIPMap.get(taskid);
     if(tip != null) {
       // check both queue-level and job-level access
-      checkAccess(tip.getJob(), UserGroupInformation.getCurrentUser(),
+      aclsManager.checkAccess(tip.getJob(),
+          UserGroupInformation.getCurrentUser(),
           QueueManager.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
 
       return tip.killTask(taskid, shouldFail);
@@ -4388,7 +4341,7 @@ public class JobTracker implements MRCon
     if (job != null) {
 
       // check both queue-level and job-level access
-      checkAccess(job, UserGroupInformation.getCurrentUser(),
+      aclsManager.checkAccess(job, UserGroupInformation.getCurrentUser(),
           QueueManager.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
 
       synchronized (taskScheduler) {
@@ -4579,24 +4532,6 @@ public class JobTracker implements MRCon
       removeMarkedTasks(trackerName);
     }
   }
-  
-  /**
-   * Is the calling user a super user? Or part of the supergroup?
-   * @return true, if it is a super user
-   */
-  static boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI,
-      UserGroupInformation superUser, String superGroup) {
-    if (superUser.getShortUserName().equals(callerUGI.getShortUserName())) {
-      return true;
-    }
-    String[] groups = callerUGI.getGroupNames();
-    for(int i=0; i < groups.length; ++i) {
-      if (groups[i].equals(superGroup)) {
-        return true;
-      }
-    }
-    return false;
-  }
 
   /**
    * Rereads the config to get hosts and exclude list file names.
@@ -4605,10 +4540,9 @@ public class JobTracker implements MRCon
   public synchronized void refreshNodes() throws IOException {
     String user = UserGroupInformation.getCurrentUser().getShortUserName();
     // check access
-    if (!isSuperUserOrSuperGroup(UserGroupInformation.getCurrentUser(), mrOwner,
-                                 supergroup)) {
+    if (!isMRAdmin(UserGroupInformation.getCurrentUser())) {
       AuditLogger.logFailure(user, Constants.REFRESH_NODES, 
-          mrOwner + " " + supergroup, Constants.JOBTRACKER, 
+          getMROwner() + " " + getSuperGroup(), Constants.JOBTRACKER, 
           Constants.UNAUTHORIZED_USER);
       throw new AccessControlException(user + 
                                        " is not authorized to refresh nodes.");
@@ -4618,15 +4552,19 @@ public class JobTracker implements MRCon
     // call the actual api
     refreshHosts();
   }
-  
+
   UserGroupInformation getMROwner() {
-    return mrOwner;
+    return aclsManager.getMROwner();
   }
 
   String getSuperGroup() {
-    return supergroup;
+    return aclsManager.getSuperGroup();
   }
-  
+
+  boolean isMRAdmin(UserGroupInformation ugi) {
+    return aclsManager.isMRAdmin(ugi);
+  }
+
   private synchronized void refreshHosts() throws IOException {
     // Reread the config to get mapred.hosts and mapred.hosts.exclude filenames.
     // Update the file names and refresh internal includes and excludes list
@@ -4992,6 +4930,11 @@ public class JobTracker implements MRCon
   }
 
   JobACLsManager getJobACLsManager() {
-    return jobACLsManager;
+    return aclsManager.getJobACLsManager();
+  }
+
+  ACLsManager getACLsManager() {
+    return aclsManager;
   }
+
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/QueueManager.java?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/QueueManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/QueueManager.java Fri Mar  4 04:13:26 2011
@@ -19,14 +19,11 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.io.PrintWriter;
 import java.io.Writer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Set;
 import java.util.TreeSet;
-import java.io.IOException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -75,28 +72,23 @@ class QueueManager {
    * Enum representing an operation that can be performed on a queue.
    */
   static enum QueueOperation {
-    SUBMIT_JOB ("acl-submit-job", false),
-    ADMINISTER_JOBS ("acl-administer-jobs", true);
+    SUBMIT_JOB ("acl-submit-job"),
+    ADMINISTER_JOBS ("acl-administer-jobs");
     // TODO: Add ACL for LIST_JOBS when we have ability to authenticate 
     //       users in UI
     // TODO: Add ACL for CHANGE_ACL when we have an admin tool for 
     //       configuring queues.
     
     private final String aclName;
-    private final boolean jobOwnerAllowed;
     
-    QueueOperation(String aclName, boolean jobOwnerAllowed) {
+    QueueOperation(String aclName) {
       this.aclName = aclName;
-      this.jobOwnerAllowed = jobOwnerAllowed;
     }
 
     final String getAclName() {
       return aclName;
     }
     
-    final boolean isJobOwnerAllowed() {
-      return jobOwnerAllowed;
-    }
   }
   
   /**
@@ -126,7 +118,7 @@ class QueueManager {
   }
   
   /**
-   * Return true if the given {@link QueueManager.QueueOperation} can be 
+   * Return true if the given {@link QueueOperation} can be 
    * performed by the specified user on the given queue.
    * 
    * An operation is allowed if all users are provided access for this
@@ -139,37 +131,9 @@ class QueueManager {
    * 
    * @return true if the operation is allowed, false otherwise.
    */
-  public synchronized boolean hasAccess(String queueName, QueueOperation oper,
-                                UserGroupInformation ugi) {
-    return hasAccess(queueName, null, oper, ugi);
-  }
-  
-  /**
-   * Return true if the given {@link QueueManager.QueueOperation} can be 
-   * performed by the specified user on the specified job in the given queue.
-   * 
-   * An operation is allowed either if the owner of the job is the user 
-   * performing the task, all users are provided access for this
-   * operation, or if either the user or any of the groups specified is
-   * provided access.
-   * 
-   * If the {@link QueueManager.QueueOperation} is not job specific then the 
-   * job parameter is ignored.
-   * 
-   * @param queueName Queue on which the operation needs to be performed.
-   * @param job The {@link JobInProgress} on which the operation is being
-   *            performed. 
-   * @param oper The operation to perform
-   * @param ugi The user and groups who wish to perform the operation.
-   * 
-   * @return true if the operation is allowed, false otherwise.
-   */
-  public synchronized boolean hasAccess(String queueName, JobInProgress job, 
+  public synchronized boolean hasAccess(String queueName,
                                 QueueOperation oper, 
                                 UserGroupInformation ugi) {
-    String user = ugi.getShortUserName();
-    String jobId = job == null ? "-" : job.getJobID().toString();
-    
     if (!aclsEnabled) {
       return true;
     }
@@ -179,17 +143,9 @@ class QueueManager {
                                             oper.getAclName()));      
     }
     
-    if (oper.isJobOwnerAllowed()) {
-      if (job != null && job.getJobConf().getUser().equals(ugi.getShortUserName())) {
-        AuditLogger.logSuccess(user, oper.name(), queueName);
-        return true;
-      }
-    }
-    
-    AccessControlList acl = aclsMap.get(toFullPropertyName(queueName, oper.getAclName()));
+    AccessControlList acl = aclsMap.get(toFullPropertyName(
+        queueName, oper.getAclName()));
     if (acl == null) {
-      AuditLogger.logFailure(user, oper.name(), null, queueName, 
-                             "Disabled queue ACLs, job : " + jobId);
       return false;
     }
     
@@ -201,12 +157,6 @@ class QueueManager {
         allowed = true;
       }
     }
-    if (allowed) {
-      AuditLogger.logSuccess(user, oper.name(), queueName);
-    } else {
-      AuditLogger.logFailure(user, oper.name(), null, queueName,
-                             Constants.UNAUTHORIZED_USER + ", job : " + jobId);
-    }
     
     return allowed;    
   }
@@ -286,7 +236,7 @@ class QueueManager {
     for (String queue : queueNames) {
       for (QueueOperation oper : QueueOperation.values()) {
         String key = toFullPropertyName(queue, oper.getAclName());
-        String aclString = conf.get(key, "*");
+        String aclString = conf.get(key, " ");// default is empty list of users
         aclsMap.put(key, new AccessControlList(aclString));
       }
     } 
@@ -294,14 +244,14 @@ class QueueManager {
   }
   
   private void initialize(Configuration conf) {
-    aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
+    aclsEnabled = conf.getBoolean(JobConf.MR_ACLS_ENABLED, false);
     String[] queues = conf.getStrings("mapred.queue.names", 
         new String[] {JobConf.DEFAULT_QUEUE_NAME});
     addToSet(queueNames, queues);
     aclsMap = getQueueAcls(conf);
   }
   
-  private static final String toFullPropertyName(String queue, 
+  static final String toFullPropertyName(String queue, 
       String property) {
     return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java Fri Mar  4 04:13:26 2011
@@ -117,10 +117,10 @@ public class TaskLogServlet extends Http
    * users and groups specified in configuration using
    * mapreduce.job.acl-view-job to view job.
    */
-  private void checkAccessForTaskLogs(JobConf conf, String user, JobID jobId,
+  private void checkAccessForTaskLogs(JobConf conf, String user, String jobId,
       TaskTracker tracker) throws AccessControlException {
 
-    if (!tracker.isJobLevelAuthorizationEnabled()) {
+    if (!tracker.areACLsEnabled()) {
       return;
     }
 
@@ -132,7 +132,7 @@ public class TaskLogServlet extends Http
     UserGroupInformation callerUGI =
         UserGroupInformation.createRemoteUser(user);
 
-    tracker.getJobACLsManager().checkAccess(jobId, callerUGI, JobACL.VIEW_JOB,
+    tracker.getACLsManager().checkAccess(jobId, callerUGI, JobACL.VIEW_JOB,
         jobOwner, jobViewACL);
   }
 
@@ -228,7 +228,7 @@ public class TaskLogServlet extends Http
       Configuration jobACLConf = getConfFromJobACLsFile(attemptId, isCleanup);
       // Ignore authorization if job-acls.xml is not found
       if (jobACLConf != null) {
-        JobID jobId = attemptId.getJobID();
+        String jobId = attemptId.getJobID().toString();
 
         try {
           checkAccessForTaskLogs(new JobConf(jobACLConf), user, jobId,

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar  4 04:13:26 2011
@@ -291,8 +291,11 @@ abstract class TaskRunner extends Thread
       Localizer.PermissionsHandler.setPermissions(logDir,
           Localizer.PermissionsHandler.sevenZeroZero);
     }
-    // write job acls into a file to know the access for task logs
-    writeJobACLs(logDir);
+
+    if (tracker.areACLsEnabled()) {
+      // write job acls into a file to know the access for task logs
+      writeJobACLs(logDir);
+    }
     return logFiles;
   }
 
@@ -301,12 +304,12 @@ abstract class TaskRunner extends Thread
     File aclFile = new File(logDir, TaskRunner.jobACLsFile);
     Configuration aclConf = new Configuration(false);
 
-    // set the job view acls in aclConf
-    String jobViewACLs = conf.get(JobContext.JOB_ACL_VIEW_JOB);
-    if (jobViewACLs != null) {
-      aclConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
-    }
-    // set jobOwner as mapreduce.job.user.name in aclConf
+    // set the job view acl in aclConf
+    String jobViewACL = conf.get(JobContext.JOB_ACL_VIEW_JOB, " ");
+
+    aclConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACL);
+
+    // set jobOwner as user.name in aclConf
     String jobOwner = conf.getUser();
     aclConf.set("user.name", jobOwner);
     FileOutputStream out = new FileOutputStream(aclFile);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 04:13:26 2011
@@ -247,9 +247,7 @@ public class TaskTracker 
   private int maxReduceSlots;
   private int failures;
 
-  // MROwner's ugi
-  private UserGroupInformation mrOwner;
-  private String supergroup;
+  private ACLsManager aclsManager;
   
   // Performance-related config knob to send an out-of-band heartbeat
   // on task completion
@@ -278,9 +276,6 @@ public class TaskTracker 
   static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
       "mapred.tasktracker.memory_calculator_plugin";
 
-  // Manages job acls of jobs in TaskTracker
-  private TaskTrackerJobACLsManager jobACLsManager;
-
   /**
    * the minimum interval between jobtracker polls
    */
@@ -585,16 +580,11 @@ public class TaskTracker 
     this.fConf = new JobConf(originalConf);
     UserGroupInformation.setConfiguration(fConf);
     SecurityUtil.login(fConf, TT_KEYTAB_FILE, TT_USER_NAME);
-    if (UserGroupInformation.isLoginKeytabBased()) {
-      mrOwner = UserGroupInformation.getLoginUser();
-    } else {
-      mrOwner = UserGroupInformation.getCurrentUser();
-    }
 
-    supergroup = fConf.get(JobConf.MR_SUPERGROUP,
-                           "supergroup");
-    LOG.info("Starting tasktracker with owner as " + mrOwner.getShortUserName()
-             + " and supergroup as " + supergroup);
+    aclsManager = new ACLsManager(fConf, new JobACLsManager(fConf), null);
+    LOG.info("Starting tasktracker with owner as " +
+        getMROwner().getShortUserName() + " and supergroup as " +
+        getSuperGroup());
 
     localFs = FileSystem.getLocal(fConf);
     if (fConf.get("slave.host.name") != null) {
@@ -691,7 +681,7 @@ public class TaskTracker 
         this.fConf, taskController);
 
     this.jobClient = (InterTrackerProtocol) 
-    mrOwner.doAs(new PrivilegedExceptionAction<Object>() {
+    getMROwner().doAs(new PrivilegedExceptionAction<Object>() {
       public Object run() throws IOException {
         return RPC.waitForProxy(InterTrackerProtocol.class,
             InterTrackerProtocol.versionID,
@@ -732,19 +722,22 @@ public class TaskTracker 
   }
 
   UserGroupInformation getMROwner() {
-    return mrOwner;
+    return aclsManager.getMROwner();
   }
 
   String getSuperGroup() {
-    return supergroup;
+    return aclsManager.getSuperGroup();
   }
-  
+
+  boolean isMRAdmin(UserGroupInformation ugi) {
+    return aclsManager.isMRAdmin(ugi);
+  }
+
   /**
-   * Is job level authorization enabled on the TT ?
+   * Are ACLs for authorization checks enabled on the TT ?
    */
-  boolean isJobLevelAuthorizationEnabled() {
-    return fConf.getBoolean(
-        JobConf.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, false);
+  boolean areACLsEnabled() {
+    return fConf.getBoolean(JobConf.MR_ACLS_ENABLED, false);
   }
 
   public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(
@@ -1257,8 +1250,7 @@ public class TaskTracker 
     checkJettyPort(httpPort);
     // create user log manager
     setUserLogManager(new UserLogManager(conf));
-    // Initialize the jobACLSManager
-    jobACLsManager = new TaskTrackerJobACLsManager(this);
+
     initialize();
   }
 
@@ -3823,7 +3815,11 @@ public class TaskTracker 
       return localJobTokenFileStr;
     }
 
-    TaskTrackerJobACLsManager getJobACLsManager() {
-      return jobACLsManager;
+    JobACLsManager getJobACLsManager() {
+      return aclsManager.getJobACLsManager();
+    }
+    
+    ACLsManager getACLsManager() {
+      return aclsManager;
     }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobACLs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobACLs.java?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobACLs.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobACLs.java Fri Mar  4 04:13:26 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapred.JobPrior
 import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.QueueManager.QueueOperation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Before;
 import org.junit.Test;
@@ -70,8 +71,11 @@ public class TestJobACLs {
     UserGroupInformation MR_UGI = UserGroupInformation.getLoginUser();
     JobConf conf = new JobConf();
 
-    // Enable job-level authorization
-    conf.setBoolean(JobConf.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, true);
+    // Enable queue and job level authorization
+    conf.setBoolean(JobConf.MR_ACLS_ENABLED, true);
+    // no queue admins for default queue
+    conf.set(QueueManager.toFullPropertyName(
+        "default", QueueOperation.ADMINISTER_JOBS.getAclName()), " ");
 
     // Enable CompletedJobStore
     FileSystem fs = FileSystem.getLocal(conf);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java Fri Mar  4 04:13:26 2011
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobHistory.*;
+import org.apache.hadoop.mapred.QueueManager.QueueOperation;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.commons.logging.Log;
@@ -801,7 +802,7 @@ public class TestJobHistory extends Test
     validateTaskAttemptLevelKeyValues(mr, job, jobInfo);
 
     // Also JobACLs should be correct
-    if (mr.getJobTrackerRunner().getJobTracker().isJobLevelAuthorizationEnabled()) {
+    if (mr.getJobTrackerRunner().getJobTracker().areACLsEnabled()) {
       assertEquals(conf.get(JobACL.VIEW_JOB.getAclName()),
           jobInfo.getJobACLs().get(JobACL.VIEW_JOB).toString());
       assertEquals(conf.get(JobACL.MODIFY_JOB.getAclName()),
@@ -911,7 +912,10 @@ public class TestJobHistory extends Test
       conf.set("mapred.job.tracker.history.completed.location", doneFolder);
 
       // Enable ACLs so that they are logged to history
-      conf.setBoolean(JobConf.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, true);
+      conf.setBoolean(JobConf.MR_ACLS_ENABLED, true);
+      // no queue admins for default queue
+      conf.set(QueueManager.toFullPropertyName(
+          "default", QueueOperation.ADMINISTER_JOBS.getAclName()), " ");
       
       mr = new MiniMRCluster(2, "file:///", 3, null, null, conf);
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java Fri Mar  4 04:13:26 2011
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.UtilsForTests;
+import org.apache.hadoop.mapred.QueueManager.QueueOperation;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import junit.framework.TestCase;
@@ -531,10 +532,11 @@ public class TestJobTrackerRestart exten
       jtConf.set("mapred.jobtracker.job.history.buffer.size", "1024");
       jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
       jtConf.setLong("mapred.tasktracker.expiry.interval", 25 * 1000);
-      jtConf.setBoolean("mapred.acls.enabled", true);
+      jtConf.setBoolean(JobConf.MR_ACLS_ENABLED, true);
       // get the user group info
       UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-      jtConf.set("mapred.queue.default.acl-submit-job", ugi.getUserName());
+      jtConf.set(QueueManager.toFullPropertyName("default",
+          QueueOperation.SUBMIT_JOB.getAclName()), ugi.getUserName());
       
       mr = new MiniMRCluster(1, namenode, 1, null, null, jtConf);
       

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java?rev=1077423&r1=1077422&r2=1077423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueAclsForCurrentUser.java Fri Mar  4 04:13:26 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import javax.security.auth.login.LoginException;
 import junit.framework.TestCase;
+
+import org.apache.hadoop.mapred.QueueManager.QueueOperation;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -31,23 +33,23 @@ public class TestQueueAclsForCurrentUser
   private QueueManager queueManager;
   private JobConf conf = null;
   UserGroupInformation currentUGI = null;
-  String submitAcl = QueueManager.QueueOperation.SUBMIT_JOB.getAclName();
-  String adminAcl  = QueueManager.QueueOperation.ADMINISTER_JOBS.getAclName();
+  String submitAcl = QueueOperation.SUBMIT_JOB.getAclName();
+  String adminAcl  = QueueOperation.ADMINISTER_JOBS.getAclName();
 
   private void setupConfForNoAccess() throws IOException,LoginException {
     currentUGI = UserGroupInformation.getLoginUser();
     String userName = currentUGI.getUserName();
     conf = new JobConf();
 
-    conf.setBoolean("mapred.acls.enabled",true);
+    conf.setBoolean(JobConf.MR_ACLS_ENABLED,true);
 
     conf.set("mapred.queue.names", "qu1,qu2");
     //Only user u1 has access
-    conf.set("mapred.queue.qu1.acl-submit-job", "u1");
-    conf.set("mapred.queue.qu1.acl-administer-jobs", "u1");
+    conf.set(QueueManager.toFullPropertyName("qu1", submitAcl), "u1");
+    conf.set(QueueManager.toFullPropertyName("qu1", adminAcl), "u1");
     //q2 only group g2 has acls for the queues
-    conf.set("mapred.queue.qu2.acl-submit-job", " g2");
-    conf.set("mapred.queue.qu2.acl-administer-jobs", " g2");
+    conf.set(QueueManager.toFullPropertyName("qu2", submitAcl), " g2");
+    conf.set(QueueManager.toFullPropertyName("qu2", adminAcl), " g2");
     queueManager = new QueueManager(conf);
 
   }
@@ -61,27 +63,27 @@ public class TestQueueAclsForCurrentUser
     String userName = currentUGI.getUserName();
     conf = new JobConf();
 
-    conf.setBoolean("mapred.acls.enabled", aclSwitch);
+    conf.setBoolean(JobConf.MR_ACLS_ENABLED, aclSwitch);
 
     conf.set("mapred.queue.names", "qu1,qu2,qu3,qu4,qu5,qu6,qu7");
     //q1 Has acls for all the users, supports both submit and administer
-    conf.set("mapred.queue.qu1.acl-submit-job", "*");
-    conf.set("mapred.queue.qu1-acl-administer-jobs", "*");
+    conf.set(QueueManager.toFullPropertyName("qu1", submitAcl), "*");
+    conf.set(QueueManager.toFullPropertyName("qu1", adminAcl), "*");
     //q2 only u2 has acls for the queues
-    conf.set("mapred.queue.qu2.acl-submit-job", "u2");
-    conf.set("mapred.queue.qu2.acl-administer-jobs", "u2");
+    conf.set(QueueManager.toFullPropertyName("qu2", submitAcl), "u2");
+    conf.set(QueueManager.toFullPropertyName("qu2", adminAcl), "u2");
     //q3  Only u2 has submit operation access rest all have administer access
-    conf.set("mapred.queue.qu3.acl-submit-job", "u2");
-    conf.set("mapred.queue.qu3.acl-administer-jobs", "*");
+    conf.set(QueueManager.toFullPropertyName("qu3", submitAcl), "u2");
+    conf.set(QueueManager.toFullPropertyName("qu3", adminAcl), "*");
     //q4 Only u2 has administer access , anyone can do submit
-    conf.set("mapred.queue.qu4.acl-submit-job", "*");
-    conf.set("mapred.queue.qu4.acl-administer-jobs", "u2");
+    conf.set(QueueManager.toFullPropertyName("qu4", submitAcl), "*");
+    conf.set(QueueManager.toFullPropertyName("qu4", adminAcl), "u2");
     //qu6 only current user has submit access
-    conf.set("mapred.queue.qu6.acl-submit-job",userName);
-    conf.set("mapred.queue.qu6.acl-administrator-jobs","u2");
+    conf.set(QueueManager.toFullPropertyName("qu6", submitAcl),userName);
+    conf.set(QueueManager.toFullPropertyName("qu6", adminAcl),"u2");
     //qu7 only current user has administrator access
-    conf.set("mapred.queue.qu7.acl-submit-job","u2");
-    conf.set("mapred.queue.qu7.acl-administrator-jobs",userName);
+    conf.set(QueueManager.toFullPropertyName("qu7", submitAcl),"u2");
+    conf.set(QueueManager.toFullPropertyName("qu7", adminAcl),userName);
     //qu8 only current group has access
     StringBuilder groupNames = new StringBuilder("");
     String[] ugiGroupNames = currentUGI.getGroupNames();
@@ -92,9 +94,10 @@ public class TestQueueAclsForCurrentUser
         groupNames.append(",");
       }
     }
-    conf.set("mapred.queue.qu5.acl-submit-job"," "+groupNames.toString());
-    conf.set("mapred.queue.qu5.acl-administrator-jobs"," "
-            +groupNames.toString());
+    conf.set(QueueManager.toFullPropertyName("qu5", submitAcl),
+        " " + groupNames.toString());
+    conf.set(QueueManager.toFullPropertyName("qu5", adminAcl),
+        " " + groupNames.toString());
 
     queueManager = new QueueManager(conf);
   }
@@ -124,7 +127,7 @@ public class TestQueueAclsForCurrentUser
 
   private void checkQueueAclsInfo(QueueAclsInfo[] queueAclsInfoList)
           throws IOException {
-    if (conf.get("mapred.acls.enabled").equalsIgnoreCase("true")) {
+    if (conf.get(JobConf.MR_ACLS_ENABLED).equalsIgnoreCase("true")) {
       for (int i = 0; i < queueAclsInfoList.length; i++) {
         QueueAclsInfo acls = queueAclsInfoList[i];
         String queueName = acls.getQueueName();