You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/02/20 20:15:50 UTC

svn commit: r912196 - in /hadoop/mapreduce/trunk: ./ src/java/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/protocol/ src/java/org/apache/hadoop/mapreduce/tools/ src/test/mapred/org/apach...

Author: ddas
Date: Sat Feb 20 19:15:50 2010
New Revision: 912196

URL: http://svn.apache.org/viewvc?rev=912196&view=rev
Log:
MAPREDUCE-1307. Introduces the Job level ACLs feature. Contributed by Vinod Kumar Vavilapalli.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobACL.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/mapred-default.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobStatus.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sat Feb 20 19:15:50 2010
@@ -57,6 +57,9 @@
     MAPREDUCE-1341. Sqoop should have an option to create hive tables and
     skip the table import step. (Leonid Furman via tomwhite)
 
+    MAPREDUCE-1307. Introduces the Job level ACLs feature. 
+    (Vinod Kumar Vavilapalli via ddas)
+
   IMPROVEMENTS
 
     MAPREDUCE-1198. Alternatively schedule different types of tasks in

Modified: hadoop/mapreduce/trunk/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/mapred-default.xml?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/trunk/src/java/mapred-default.xml Sat Feb 20 19:15:50 2010
@@ -931,6 +931,72 @@
 </property>
 
 <property>
+  <name>mapreduce.cluster.job-authorization-enabled</name>
+  <value>false</value>
+  <description> Boolean flag that specifies if job-level authorization checks
+  should be enabled on the jobs submitted to the cluster.  Job-level
+  authorization is enabled if this flag is set to true or disabled otherwise.
+  It is disabled by default. If enabled, access control checks are made by
+  JobTracker and TaskTracker when requests are made by users for viewing the
+  job-details (See mapreduce.job.acl-view-job) or for modifying the job
+  (See mapreduce.job.acl-modify-job) using Map/Reduce APIs, RPCs or via the
+  console and web user interfaces.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.job.acl-modify-job</name>
+  <value></value>
+  <description> Job specific access-control list for 'modifying' the job. It
+    is only used if authorization is enabled in Map/Reduce by setting the
+    configuration property mapreduce.cluster.job-authorization-enabled to true.
+    This specifies the list of users and/or groups who can do modification
+    operations on the job. For specifying a list of users and groups the
+    format to use is "user1,user2 group1,group". If set to '*', it allows all
+    users/groups to modify this job. If set to '', it allows none. This
+    configuration is used to guard all the modifications with respect to this
+    job and takes care of all the following operations:
+      o killing this job
+      o killing a task of this job, failing a task of this job
+      o setting the priority of this job
+    Each of these operations are also protected by the per-queue level ACL
+    "acl-administer-jobs" configured via mapred-queues.xml. So a caller should
+    have the authorization to satisfy both the queue-level ACL and the
+    job-level ACL.
+
+    Irrespective of this ACL configuration, job-owner, superuser and members
+    of supergroup configured on JobTracker via mapred.permissions.supergroup,
+    can do all the modification operations.
+
+    By default, nobody else besides job-owner, superuser/supergroup can
+    perform modification operations on a job that they don't own.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.job.acl-view-job</name>
+  <value></value>
+  <description> Job specific access-control list for 'viewing' the job. It is
+    only used if authorization is enabled in Map/Reduce by setting the
+    configuration property mapreduce.cluster.job-authorization-enabled to true.
+    This specifies the list of users and/or groups who can view private details
+    about the job. For specifying a list of users and groups the
+    format to use is "user1,user2 group1,group". If set to '*', it allows all
+    users/groups to modify this job. If set to '', it allows none. This
+    configuration is used to guard some of the job-views and at present only
+    protects APIs that can return possibly sensitive information of the
+    job-owner like
+      o job-level counters
+      o task-level counters
+      o tasks' diagnostic information
+      o task-logs displayed on the TaskTracker web-UI and
+      o job.xml showed by the JobTracker's web-UI
+    Every other piece information of jobs is still accessible by any other
+    users, for e.g., JobStatus, JobProfile, list of jobs in the queue, etc.
+  </description>
+</property>
+
+<property>
   <name>mapreduce.tasktracker.indexcache.mb</name>
   <value>10</value>
   <description> The maximum memory that a task tracker allows for the 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java Sat Feb 20 19:15:50 2010
@@ -27,7 +27,10 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.AccessControlException;
 
 /**
  * Persists and retrieves the Job info of a job into/from DFS.
@@ -46,13 +49,16 @@
   private FileSystem fs;
   private static final String JOB_INFO_STORE_DIR = "/jobtracker/jobsInfo";
 
+  private JobACLsManager jobACLsManager = null;
+
   public static final Log LOG =
           LogFactory.getLog(CompletedJobStatusStore.class);
 
   private static long HOUR = 1000 * 60 * 60;
   private static long SLEEP_TIME = 1 * HOUR;
 
-  CompletedJobStatusStore(Configuration conf) throws IOException {
+  CompletedJobStatusStore(JobACLsManager aclsManager, Configuration conf)
+      throws IOException {
     active =
       conf.getBoolean(JTConfig.JT_PERSIST_JOBSTATUS, false);
 
@@ -68,13 +74,21 @@
       // set the fs
       this.fs = path.getFileSystem(conf);
       if (!fs.exists(path)) {
-        fs.mkdirs(path);
+        if (!fs.mkdirs(path)) {
+          active = false;
+          LOG.warn("Couldn't create " + jobInfoDir
+              + ". CompletedJobStore will be inactive.");
+          return;
+        }
       }
 
       if (retainTime == 0) {
         // as retain time is zero, all stored jobstatuses are deleted.
         deleteJobStatusDirs();
       }
+
+      this.jobACLsManager = aclsManager;
+
       LOG.info("Completed job store activated/configured with retain-time : " 
                + retainTime + " , job-info-dir : " + jobInfoDir);
     } else {
@@ -275,18 +289,23 @@
    *
    * @param jobId the jobId for which Counters is queried
    * @return Counters object, null if not able to retrieve
+   * @throws AccessControlException 
    */
-  public Counters readCounters(JobID jobId) {
+  public Counters readCounters(JobID jobId) throws AccessControlException {
     Counters counters = null;
     if (active) {
       try {
         FSDataInputStream dataIn = getJobInfoFile(jobId);
         if (dataIn != null) {
-          readJobStatus(dataIn);
+          JobStatus jobStatus = readJobStatus(dataIn);
+          jobACLsManager.checkAccess(jobStatus,
+              UserGroupInformation.getCurrentUser(), JobACL.VIEW_JOB);
           readJobProfile(dataIn);
           counters = readCounters(dataIn);
           dataIn.close();
         }
+      } catch (AccessControlException ace) {
+        throw ace;
       } catch (IOException ex) {
         LOG.warn("Could not read [" + jobId + "] job counters : " + ex, ex);
       }

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java?rev=912196&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java Sat Feb 20 19:15:50 2010
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
+@InterfaceAudience.Private
+public class JobACLsManager {
+
+  private JobTracker jobTracker = null;
+
+  public JobACLsManager(JobTracker tracker) {
+    jobTracker = tracker;
+  }
+
+  /**
+   * Construct the jobACLs from the configuration so that they can be kept in
+   * the memory. If authorization is disabled on the JT, nothing is constructed
+   * and an empty map is returned.
+   * 
+   * @return JobACl to AccessControlList map.
+   */
+  Map<JobACL, AccessControlList> constructJobACLs(JobConf conf) {
+    
+    Map<JobACL, AccessControlList> acls =
+      new HashMap<JobACL, AccessControlList>();
+
+    // Don't construct anything if authorization is disabled.
+    if (!jobTracker.isJobLevelAuthorizationEnabled()) {
+      return acls;
+    }
+
+    for (JobACL aclName : JobACL.values()) {
+      String aclConfigName = aclName.getAclName();
+      String aclConfigured = conf.get(aclConfigName);
+      if (aclConfigured == null) {
+        // If ACLs are not configured at all, we grant no access to anyone. So
+        // jobOwner and superuser/supergroup _only_ can do 'stuff'
+        aclConfigured = "";
+      }
+      acls.put(aclName, new AccessControlList(aclConfigured));
+    }
+    return acls;
+  }
+
+  /**
+   * If authorization is enabled on the JobTracker, checks whether the user (in
+   * the callerUGI) is authorized to perform the operation specify by
+   * 'jobOperation' on the job.
+   * <ul>
+   * <li>The owner of the job can do any operation on the job</li>
+   * <li>The superuser/supergroup of the JobTracker is always permitted to do
+   * operations on any job.</li>
+   * <li>For all other users/groups job-acls are checked</li>
+   * </ul>
+   * 
+   * @param jobStatus
+   * @param callerUGI
+   * @param jobOperation
+   */
+  void checkAccess(JobStatus jobStatus, UserGroupInformation callerUGI,
+      JobACL jobOperation) throws AccessControlException {
+
+    if (!jobTracker.isJobLevelAuthorizationEnabled()) {
+      return;
+    }
+
+    JobID jobId = jobStatus.getJobID();
+
+    // Check for superusers/supergroups
+    if (jobTracker.isSuperUserOrSuperGroup(callerUGI)) {
+      JobInProgress.LOG.info("superuser/supergroup "
+          + callerUGI.getShortUserName() + " trying to perform "
+          + jobOperation.toString() + " on " + jobId);
+      return;
+    }
+
+    // Job-owner is always part of all the ACLs
+    if (callerUGI.getShortUserName().equals(jobStatus.getUsername())) {
+      JobInProgress.LOG.info("Jobowner " + callerUGI.getShortUserName()
+          + " trying to perform " + jobOperation.toString() + " on "
+          + jobId);
+      return;
+    }
+
+    AccessControlList acl = jobStatus.getJobACLs().get(jobOperation);
+    if (acl.isUserAllowed(callerUGI)) {
+      JobInProgress.LOG.info("Normal user " + callerUGI.getShortUserName()
+          + " trying to perform " + jobOperation.toString() + " on "
+          + jobId);
+      return;
+    }
+
+    throw new AccessControlException(callerUGI
+        + " not authorized for performing the operation "
+        + jobOperation.toString() + " on " + jobId + ". "
+        + jobOperation.toString() + " configured for this job : "
+        + acl.toString());
+  }
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sat Feb 20 19:15:50 2010
@@ -48,6 +48,7 @@
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
@@ -72,15 +73,17 @@
 import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 
 /*************************************************************
@@ -109,7 +112,6 @@
   JobStatus status;
   Path jobFile = null;
   Path localJobFile = null;
-  String user;
 
   TaskInProgress maps[] = new TaskInProgress[0];
   TaskInProgress reduces[] = new TaskInProgress[0];
@@ -228,6 +230,7 @@
 
   LocalFileSystem localFs;
   FileSystem fs;
+  String user;
   JobID jobId;
   private boolean hasSpeculativeMaps;
   private boolean hasSpeculativeReduces;
@@ -425,6 +428,9 @@
     JobContext jobContext = new JobContextImpl(conf, jobId);
     this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
 
+    // Construct the jobACLs
+    status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
+
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
         
@@ -712,6 +718,25 @@
   }
 
   /**
+   * If authorization is enabled on the JobTracker, checks whether the user (in
+   * the callerUGI) is authorized to perform the operation specify by
+   * 'jobOperation' on the job.
+   * <ul>
+   * <li>The owner of the job can do any operation on the job</li>
+   * <li>The superuser/supergroup of the JobTracker is always permitted to do
+   * operations on any job.</li>
+   * <li>For all other users/groups job-acls are checked</li>
+   * </ul>
+   * 
+   * @param callerUGI
+   * @param jobOperation
+   */
+  void checkAccess(UserGroupInformation callerUGI, JobACL jobOperation)
+      throws AccessControlException {
+    jobtracker.getJobACLsManager().checkAccess(status, callerUGI, jobOperation);
+  }
+
+  /**
    * If the number of taks is greater than the configured value
    * throw an exception that will fail job initialization
    */

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java Sat Feb 20 19:15:50 2010
@@ -17,6 +17,11 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
 /**************************************************
  * Describes the current status of a job.  This is
  * not intended to be a comprehensive piece of data.
@@ -281,7 +286,11 @@
    protected synchronized void setSchedulingInfo(String schedulingInfo) {
      super.setSchedulingInfo(schedulingInfo);
    }
-   
+
+   protected synchronized void setJobACLs(Map<JobACL, AccessControlList> acls) {
+     super.setJobACLs(acls);
+   }
+
   /**
    * Set the priority of the job, defaulting to NORMAL.
    * @param jp new job priority

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Sat Feb 20 19:15:50 2010
@@ -71,6 +71,7 @@
 import org.apache.hadoop.mapred.JobTrackerStatistics.TaskTrackerStat;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.QueueInfo;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
@@ -1252,6 +1253,7 @@
                                                 "expireLaunchingTasks");
 
   final CompletedJobStatusStore completedJobStatusStore;
+  private JobACLsManager jobACLsManager;
   Thread completedJobsStoreThread = null;
   final RecoveryManager recoveryManager;
 
@@ -1587,8 +1589,10 @@
     this.numTaskCacheLevels = conf.getInt(JT_TASKCACHE_LEVELS, 
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
+    // Initialize the jobACLSManager
+    jobACLsManager = new JobACLsManager(this);
     //initializes the job status store
-    completedJobStatusStore = new CompletedJobStatusStore(conf);
+    completedJobStatusStore = new CompletedJobStatusStore(jobACLsManager, conf);
   }
 
   private static SimpleDateFormat getDateFormat() {
@@ -2991,11 +2995,11 @@
       throw new IOException("Queue \"" + queue + "\" is not running");
     }
     try {
-      checkAccess(job, Queue.QueueOperation.SUBMIT_JOB, ugi);
-    } catch (IOException ioe) {
+      checkAccess(job, ugi, Queue.QueueOperation.SUBMIT_JOB, null);
+    } catch (AccessControlException ace) {
       LOG.warn("Access denied for user " + job.getJobConf().getUser() 
-          + ". Ignoring job " + jobId, ioe);
-      throw ioe;
+          + ". Ignoring job " + jobId, ace);
+      throw ace;
     }
 
     // Check the job if it cannot run in the cluster because of invalid memory
@@ -3046,30 +3050,56 @@
     return job.getStatus();
   }
 
-  // Check whether the specified operation can be performed
-  // related to the job.
-  private void checkAccess(JobInProgress job, 
-                                Queue.QueueOperation oper) 
-                                  throws IOException {
-    // get the user group info
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    checkAccess(job, oper, ugi);
+  /**
+   * Is job-level authorization enabled on the JT?
+   * 
+   * @return
+   */
+  boolean isJobLevelAuthorizationEnabled() {
+    return conf.getBoolean(
+        MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, false);
   }
 
-  // use the passed ugi for checking the access
-  private void checkAccess(JobInProgress job, Queue.QueueOperation oper,
-                           UserGroupInformation ugi) throws IOException {
-    // get the queue
+  /**
+   * Check the ACLs for a user doing the passed queue-operation and the passed
+   * job operation.
+   * <ul>
+   * <li>Superuser/supergroup can do any operation on the job</li>
+   * <li>For any other user/group, the configured ACLs for the corresponding
+   * queue and the job are checked.</li>
+   * </ul>
+   * 
+   * @param job
+   * @param callerUGI
+   * @param oper
+   * @param jobOperation
+   * @throws AccessControlException
+   * @throws IOException
+   */
+  private void checkAccess(JobInProgress job,
+      UserGroupInformation callerUGI, Queue.QueueOperation oper,
+      JobACL jobOperation) throws AccessControlException {
+
+    // get the queue and verify the queue access
     String queue = job.getProfile().getQueueName();
-    if (!queueManager.hasAccess(queue, job, oper, ugi)) {
+    if (!queueManager.hasAccess(queue, job, oper, callerUGI)) {
       throw new AccessControlException("User " 
-                            + ugi.getShortUserName() 
+                            + callerUGI.getShortUserName() 
                             + " cannot perform "
                             + "operation " + oper + " on queue " + queue +
                             ".\n Please run \"hadoop queue -showacls\" " +
                             "command to find the queues you have access" +
                             " to .");
     }
+
+    // check nulls, for e.g., submitJob RPC doesn't have a jobOperation as the
+    // job itself isn't created by that time.
+    if (jobOperation == null) {
+      return;
+    }
+
+    // check the access to the job
+    job.checkAccess(callerUGI, jobOperation);
   }
 
   /**@deprecated use {@link #getClusterStatus(boolean)}*/
@@ -3156,6 +3186,10 @@
     return info;
   }
 
+  /**
+   * @see ClientProtocol#killJob(org.apache.hadoop.mapreduce.JobID)
+   */
+  @Override
   public synchronized void killJob(org.apache.hadoop.mapreduce.JobID jobid) 
       throws IOException {
     killJob(JobID.downgrade(jobid));
@@ -3177,8 +3211,11 @@
       LOG.info("killJob(): JobId " + jobid.toString() + " is not a valid job");
       return;
     }
-        
-    checkAccess(job, Queue.QueueOperation.ADMINISTER_JOBS);
+
+    // check both queue-level and job-level access
+    checkAccess(job, UserGroupInformation.getCurrentUser(),
+        Queue.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
+
     killJob(job);
   }
 
@@ -3288,10 +3325,9 @@
   }
 
   /**
-   * Set the priority of a job
-   * @param jobid id of the job
-   * @param priority new priority of the job
+   * @see ClientProtocol#setJobPriority(org.apache.hadoop.mapreduce.JobID, String)
    */
+  @Override
   public synchronized void setJobPriority(org.apache.hadoop.mapreduce.JobID 
       jobid, String priority) throws IOException {
     setJobPriority(JobID.downgrade(jobid), priority);
@@ -3313,7 +3349,6 @@
             + " is not a valid job");
         return;
     }
-    checkAccess(job, Queue.QueueOperation.ADMINISTER_JOBS);
     JobPriority newPriority = JobPriority.valueOf(priority);
     setJobPriority(jobid, newPriority);
   }
@@ -3341,7 +3376,11 @@
     }
     return completedJobStatusStore.readJobProfile(jobid);
   }
-  
+
+  /**
+   * see {@link ClientProtocol#getJobStatus(org.apache.hadoop.mapreduce.JobID)}
+   */
+  @Override
   public JobStatus getJobStatus(org.apache.hadoop.mapreduce.JobID jobid) {
     return getJobStatus(JobID.downgrade(jobid));
   }
@@ -3361,7 +3400,6 @@
       if (job != null) {
         return job.getStatus();
       } else {
-        
         JobStatus status = retireJobs.get(jobid);
         if (status != null) {
           return status;
@@ -3370,10 +3408,38 @@
     }
     return completedJobStatusStore.readJobStatus(jobid);
   }
-  
+
+  /**
+   * see
+   * {@link ClientProtocol#getJobCounters(org.apache.hadoop.mapreduce.JobID)}
+   * 
+   * @throws IOException
+   * @throws AccessControlException
+   */
+  @Override
   public org.apache.hadoop.mapreduce.Counters getJobCounters(
-      org.apache.hadoop.mapreduce.JobID jobid) {
-    Counters counters = getJobCounters(JobID.downgrade(jobid));
+      org.apache.hadoop.mapreduce.JobID jobid)
+      throws AccessControlException, IOException {
+
+    JobID oldJobID = JobID.downgrade(jobid);
+
+    synchronized (this) {
+      JobInProgress job = jobs.get(oldJobID);
+      if (job != null) {
+
+        // check the job-access
+        job.checkAccess(UserGroupInformation.getCurrentUser(),
+            JobACL.VIEW_JOB);
+
+        Counters counters = job.getCounters();
+        if (counters != null) {
+          return new org.apache.hadoop.mapreduce.Counters(counters);
+        }
+        return null;
+      } 
+    }
+
+    Counters counters = completedJobStatusStore.readCounters(oldJobID);
     if (counters != null) {
       return new org.apache.hadoop.mapreduce.Counters(counters);
     }
@@ -3386,13 +3452,14 @@
    */
   @Deprecated
   public Counters getJobCounters(JobID jobid) {
-    synchronized (this) {
-      JobInProgress job = jobs.get(jobid);
-      if (job != null) {
-        return job.getCounters();
-      } 
-    }
-    return completedJobStatusStore.readCounters(jobid);
+    try {
+      return Counters.downgrade(
+          getJobCounters((org.apache.hadoop.mapreduce.JobID) jobid));
+    } catch (AccessControlException e) {
+      return null;
+    } catch (IOException e) {
+      return null;
+    } 
   }
   
   /**
@@ -3514,8 +3581,26 @@
     }
   }
 
+  /**
+   * see
+   * {@link ClientProtocol#getTaskReports(org.apache.hadoop.mapreduce.JobID, TaskType)}
+   * @throws IOException 
+   * @throws AccessControlException 
+   */
+  @Override
   public synchronized TaskReport[] getTaskReports(
-      org.apache.hadoop.mapreduce.JobID jobid, TaskType type) {
+      org.apache.hadoop.mapreduce.JobID jobid, TaskType type)
+      throws AccessControlException, IOException {
+
+    // Check authorization
+    JobInProgress job = jobs.get(jobid);
+    if (job != null) {
+      job.checkAccess(UserGroupInformation.getCurrentUser(),
+          JobACL.VIEW_JOB);
+    } else { 
+      return new TaskReport[0];
+    }
+
     switch (type) {
       case MAP :
         return getMapTaskReports(JobID.downgrade(jobid));
@@ -3573,6 +3658,7 @@
       throws IOException {
     return getTaskDiagnostics(TaskAttemptID.downgrade(taskId));
   }
+
   /**
    * Get the diagnostics for a given task
    * @param taskId the id of the task
@@ -3586,6 +3672,11 @@
     TaskID tipId = taskId.getTaskID();
     JobInProgress job = jobs.get(jobId);
     if (job != null) {
+
+      // check the access to the job.
+      job.checkAccess(UserGroupInformation.getCurrentUser(),
+          JobACL.VIEW_JOB);
+
       TaskInProgress tip = job.getTaskInProgress(tipId);
       if (tip != null) {
         taskDiagnosticInfo = tip.getDiagnosticInfo(taskId);
@@ -3635,6 +3726,11 @@
     return (job == null ? null : job.getTaskInProgress(tipid));
   }
 
+  /**
+   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#killTask(org.apache.hadoop.mapreduce.TaskAttemptID,
+   *      boolean)
+   */
+  @Override
   public synchronized boolean killTask(
       org.apache.hadoop.mapreduce.TaskAttemptID taskid,
       boolean shouldFail) throws IOException {
@@ -3647,7 +3743,11 @@
       boolean shouldFail) throws IOException {
     TaskInProgress tip = taskidToTIPMap.get(taskid);
     if(tip != null) {
-      checkAccess(tip.getJob(), Queue.QueueOperation.ADMINISTER_JOBS);
+
+      // check both queue-level and job-level access
+      checkAccess(tip.getJob(), UserGroupInformation.getCurrentUser(),
+          Queue.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
+
       return tip.killTask(taskid, shouldFail);
     }
     else {
@@ -3668,7 +3768,10 @@
   public JobStatus[] jobsToComplete() {
     return getJobStatus(jobs.values(), true);
   } 
-  
+
+  /**
+   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getSystemDir()
+   */
   public org.apache.hadoop.mapreduce.JobStatus[] getAllJobs() {
     List<JobStatus> list = new ArrayList<JobStatus>();
     list.addAll(Arrays.asList(getJobStatus(jobs.values(),false)));
@@ -3733,12 +3836,21 @@
 
   /**
    * Change the run-time priority of the given job.
+   * 
    * @param jobId job id
    * @param priority new {@link JobPriority} for the job
+   * @throws IOException
+   * @throws AccessControlException
    */
-  synchronized void setJobPriority(JobID jobId, JobPriority priority) {
+  synchronized void setJobPriority(JobID jobId, JobPriority priority)
+      throws AccessControlException, IOException {
     JobInProgress job = jobs.get(jobId);
     if (job != null) {
+
+      // check both queue-level and job-level access
+      checkAccess(job, UserGroupInformation.getCurrentUser(),
+          Queue.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
+
       synchronized (taskScheduler) {
         JobStatus oldStatus = (JobStatus)job.getStatus().clone();
         job.setPriority(priority);
@@ -3920,16 +4032,14 @@
   }
   
   /**
-   * Is the current user a super user?
+   * Is the calling user a super user? Or part of the supergroup?
    * @return true, if it is a super user
-   * @throws IOException if there are problems getting the current user
    */
-  private synchronized boolean isSuperUser() throws IOException {
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    if (mrOwner.getShortUserName().equals(ugi.getShortUserName()) ) {
+  boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI) {
+    if (mrOwner.getShortUserName().equals(callerUGI.getShortUserName())) {
       return true;
     }
-    String[] groups = ugi.getGroupNames();
+    String[] groups = callerUGI.getGroupNames();
     for(int i=0; i < groups.length; ++i) {
       if (groups[i].equals(supergroup)) {
         return true;
@@ -3944,7 +4054,7 @@
    */
   public synchronized void refreshNodes() throws IOException {
     // check access
-    if (!isSuperUser()) {
+    if (!isSuperUserOrSuperGroup(UserGroupInformation.getCurrentUser())) {
       String user = UserGroupInformation.getCurrentUser().getShortUserName();
       throw new AccessControlException(user + 
                                        " is not authorized to refresh nodes.");
@@ -3954,6 +4064,10 @@
     refreshHosts();
   }
   
+  String getSuperGroup() {
+    return supergroup;
+  }
+  
   private synchronized void refreshHosts() throws IOException {
     // Reread the config to get HOSTS and HOSTS_EXCLUDE filenames.
     // Update the file names and refresh internal includes and excludes list
@@ -4457,8 +4571,11 @@
     this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
+    // Initialize the jobACLSManager
+    jobACLsManager = new JobACLsManager(this);
+
     //initializes the job status store
-    completedJobStatusStore = new CompletedJobStatusStore(conf);
+    completedJobStatusStore = new CompletedJobStatusStore(jobACLsManager, conf);
   }
 
   /**
@@ -4510,4 +4627,8 @@
     String user = UserGroupInformation.getCurrentUser().getUserName();
     return secretManager.renewToken(token, user);
   }
+
+  JobACLsManager getJobACLsManager() {
+    return jobACLsManager;
+  }
 }

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobACL.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobACL.java?rev=912196&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobACL.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobACL.java Sat Feb 20 19:15:50 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import org.apache.hadoop.classification.*;
+
+/**
+ * Job related ACLs
+ */
+@InterfaceAudience.Private
+public enum JobACL {
+
+  /**
+   * ACL for 'viewing' job. Dictates who can 'view' some or all of the job
+   * related details.
+   */
+  VIEW_JOB(JobContext.JOB_ACL_VIEW_JOB),
+
+  /**
+   * ACL for 'modifying' job. Dictates who can 'modify' the job for e.g., by
+   * killing the job, killing/failing a task of the job or setting priority of
+   * the job.
+   */
+  MODIFY_JOB(JobContext.JOB_ACL_MODIFY_JOB);
+
+  String aclName;
+
+  JobACL(String name) {
+    this.aclName = name;
+  }
+
+  /**
+   * Get the name of the ACL. Here it is same as the name of the configuration
+   * property for specifying the ACL for the job.
+   * 
+   * @return aclName
+   */
+  public String getAclName() {
+    return aclName;
+  }
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Sat Feb 20 19:15:50 2010
@@ -229,7 +229,11 @@
     "mapreduce.reduce.merge.memtomem.enabled";
   public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";
   public static final String JOB_JOBTRACKER_ID = "mapreduce.job.kerberos.jtprinicipal";
-  
+
+  public static final String JOB_ACL_VIEW_JOB =
+      "mapreduce.job.acl-view-job";
+  public static final String JOB_ACL_MODIFY_JOB =
+      "mapreduce.job.acl-modify-job";
 
   /**
    * Return the configuration for the job.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobStatus.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobStatus.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobStatus.java Sat Feb 20 19:15:50 2010
@@ -20,12 +20,16 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.authorize.AccessControlList;
 
 /**************************************************
  * Describes the current status of a job.
@@ -73,6 +77,9 @@
   private JobPriority priority;
   private String schedulingInfo="NA";
 
+  private Map<JobACL, AccessControlList> jobACLs =
+      new HashMap<JobACL, AccessControlList>();
+
   private String jobName;
   private String jobFile;
   private long finishTime;
@@ -222,7 +229,11 @@
   protected synchronized void setSchedulingInfo(String schedulingInfo) {
     this.schedulingInfo = schedulingInfo;
   }
-  
+
+  protected synchronized void setJobACLs(Map<JobACL, AccessControlList> acls) {
+    this.jobACLs = acls;
+  }
+
   /**
    * @return Percentage of progress in maps 
    */
@@ -281,6 +292,10 @@
    return schedulingInfo;
   }
 
+  public synchronized Map<JobACL, AccessControlList> getJobACLs() {
+    return jobACLs;
+  }
+
   /**
    * Return the priority of the job
    * @return job priority
@@ -316,6 +331,13 @@
     Text.writeString(out, jobName);
     Text.writeString(out, trackingUrl);
     Text.writeString(out, jobFile);
+
+    // Serialize the job's ACLs
+    out.writeInt(jobACLs.size());
+    for (Entry<JobACL, AccessControlList> entry : jobACLs.entrySet()) {
+      WritableUtils.writeEnum(out, entry.getKey());
+      Text.writeString(out, entry.getValue().toString());
+    }
   }
 
   public synchronized void readFields(DataInput in) throws IOException {
@@ -336,6 +358,14 @@
     this.jobName = Text.readString(in);
     this.trackingUrl = Text.readString(in);
     this.jobFile = Text.readString(in);
+
+    // De-serialize the job's ACLs
+    int numACLs = in.readInt();
+    for (int i = 0; i < numACLs; i++) {
+      JobACL aclType = WritableUtils.readEnum(in, JobACL.class);
+      String acl = Text.readString(in);
+      this.jobACLs.put(aclType, new AccessControlList(acl));
+    }
   }
 
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java Sat Feb 20 19:15:50 2010
@@ -35,6 +35,8 @@
   public static final String MAPMEMORY_MB = "mapreduce.cluster.mapmemory.mb";
   public static final String REDUCEMEMORY_MB = 
     "mapreduce.cluster.reducememory.mb";
+  public static final String JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG = 
+    "mapreduce.cluster.job-authorization-enabled";
 
   //Delegation token related keys
   public static final String  DELEGATION_KEY_UPDATE_INTERVAL_KEY = 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Sat Feb 20 19:15:50 2010
@@ -101,9 +101,10 @@
    *             user home dir. JobTracker reads the required files from the
    *             staging area using user credentials passed via the rpc.
    * Version 31: Added TokenStorage to submitJob      
-   * Version 32: Added delegation tokens (add, renew, cancel)    
+   * Version 32: Added delegation tokens (add, renew, cancel)
+   * Version 33: Added JobACLs to JobStatus as part of MAPREDUCE-1307
    */
-  public static final long versionID = 32L;
+  public static final long versionID = 33L;
 
   /**
    * Allocate a name for the job.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java Sat Feb 20 19:15:50 2010
@@ -25,6 +25,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TIPStatus;
 import org.apache.hadoop.mapreduce.Cluster;
@@ -39,6 +40,7 @@
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -218,9 +220,9 @@
         if (job == null) {
           System.out.println("Could not find job " + jobid);
         } else {
+          Counters counters = job.getCounters();
           System.out.println();
           System.out.println(job);
-          Counters counters = job.getCounters();
           if (counters != null) {
             System.out.println(counters);
           } else {
@@ -307,6 +309,13 @@
           exitCode = -1;
         }
       }
+    } catch (RemoteException re) {
+      IOException unwrappedException = re.unwrapRemoteException();
+      if (unwrappedException instanceof AccessControlException) {
+        System.out.println(unwrappedException.getMessage());
+      } else {
+        throw re;
+      }
     } finally {
       cluster.close();
     }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java Sat Feb 20 19:15:50 2010
@@ -37,6 +37,7 @@
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.StaticMapping;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
@@ -550,8 +551,12 @@
 
   /**
    * Change the job's priority
+   * 
+   * @throws IOException
+   * @throws AccessControlException
    */
-  public void setJobPriority(JobID jobId, JobPriority priority) {
+  public void setJobPriority(JobID jobId, JobPriority priority)
+      throws AccessControlException, IOException {
     jobTracker.getJobTracker().setJobPriority(jobId, priority);
   }
 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java Sat Feb 20 19:15:50 2010
@@ -17,12 +17,14 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.File;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
 import java.util.Properties;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -32,7 +34,12 @@
   static final Path TEST_DIR = 
     new Path(System.getProperty("test.build.data","/tmp"), 
              "job-status-persistence");
-  
+
+  @Override
+  protected void setUp() throws Exception {
+    // Don't start anything by default
+  }
+
   private JobID runJob() throws Exception {
     OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
     Writer wr = new OutputStreamWriter(os);
@@ -65,6 +72,7 @@
   }
 
   public void testNonPersistency() throws Exception {
+    startCluster(true, null);
     JobID jobId = runJob();
     JobClient jc = new JobClient(createJobConf());
     RunningJob rj = jc.getJob(jobId);
@@ -80,8 +88,7 @@
     Properties config = new Properties();
     config.setProperty(JTConfig.JT_PERSIST_JOBSTATUS, "true");
     config.setProperty(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, "1");
-    stopCluster();
-    startCluster(false, config);
+    startCluster(true, config);
     JobID jobId = runJob();
     JobClient jc = new JobClient(createJobConf());
     RunningJob rj0 = jc.getJob(jobId);
@@ -113,7 +120,7 @@
    * Test if the completed job status is persisted to localfs.
    */
   public void testLocalPersistency() throws Exception {
-    FileSystem fs = FileSystem.getLocal(createJobConf());
+    FileSystem fs = FileSystem.getLocal(new JobConf());
     
     fs.delete(TEST_DIR, true);
     
@@ -122,8 +129,7 @@
     config.setProperty(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, "1");
     config.setProperty(JTConfig.JT_PERSIST_JOBSTATUS_DIR, 
                        fs.makeQualified(TEST_DIR).toString());
-    stopCluster();
-    startCluster(false, config);
+    startCluster(true, config);
     JobID jobId = runJob();
     JobClient jc = new JobClient(createJobConf());
     RunningJob rj = jc.getJob(jobId);
@@ -134,4 +140,44 @@
     assertTrue("Missing job info from the local fs", fs.exists(jobInfo));
     fs.delete(TEST_DIR, true);
   }
+
+  /**
+   * Verify that completed-job store is inactive if the jobinfo path is not
+   * writable.
+   * 
+   * @throws Exception
+   */
+  public void testJobStoreDisablingWithInvalidPath() throws Exception {
+    MiniMRCluster mr = null;
+    Path parent = new Path(TEST_DIR, "parent");
+    try {
+      FileSystem fs = FileSystem.getLocal(new JobConf());
+
+      if (fs.exists(TEST_DIR) && !fs.delete(TEST_DIR, true)) {
+        fail("Cannot delete TEST_DIR!");
+      }
+
+      if (fs.mkdirs(new Path(TEST_DIR, parent))) {
+        if (!(new File(parent.toUri().getPath()).setWritable(false, false))) {
+          fail("Cannot chmod parent!");
+        }
+      } else {
+        fail("Cannot create parent dir!");
+      }
+      JobConf config = new JobConf();
+      config.set(JTConfig.JT_PERSIST_JOBSTATUS, "true");
+      config.set(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, "1");
+      config.set(JTConfig.JT_PERSIST_JOBSTATUS_DIR, new Path(parent,
+          "child").toUri().getPath());
+      mr = new MiniMRCluster(0, "file:///", 4, null, null, config);
+      assertFalse(
+          "CompletedJobStore is unexpectly active!",
+          mr.getJobTrackerRunner().getJobTracker().completedJobStatusStore.isActive());
+    } finally {
+      new File(parent.toUri().getPath()).setWritable(true, false);
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java Sat Feb 20 19:15:50 2010
@@ -142,8 +142,18 @@
         .downgrade(jobID));
     tracker.initJob(jip);
     try {
-      tracker.killJob(jobID);
-      fail("current user is neither u1 nor in the administer group list");
+      final Configuration userConf =
+          new Configuration(miniMRCluster.createJobConf());
+      UserGroupInformation ugi =
+          UserGroupInformation.createUserForTesting("someRandomUser",
+              new String[] { "someRandomGroup" });
+      cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
+        public Cluster run() throws IOException {
+          return new Cluster(userConf);
+        }
+      });
+      cluster.getJob(jobID).killJob();
+      fail("user 'someRandomeUser' is neither u1 nor in the administer group list");
     } catch (Exception e) {
       final Configuration userConf = new Configuration(miniMRCluster.createJobConf());
       UserGroupInformation ugi = 

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java?rev=912196&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java Sat Feb 20 19:15:50 2010
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.After;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Verify the job-ACLs
+ * 
+ */
+public class TestJobACLs {
+
+  static final Log LOG = LogFactory.getLog(TestJobACLs.class);
+
+  private MiniMRCluster mr = null;
+
+  private static final Path TEST_DIR =
+      new Path(System.getProperty("test.build.data", "/tmp"),
+          TestJobACLs.class.getCanonicalName() + Path.SEPARATOR
+              + "completed-job-store");
+
+  /**
+   * Start the cluster before running the actual test.
+   * 
+   * @throws IOException
+   */
+  @Before
+  public void setup() throws IOException {
+    // Start the cluster
+    startCluster(false);
+  }
+
+  private void startCluster(boolean reStart) throws IOException {
+    UserGroupInformation MR_UGI = UserGroupInformation.getLoginUser();
+    JobConf conf = new JobConf();
+
+    // Enable job-level authorization
+    conf.setBoolean(MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, true);
+
+    // Enable CompletedJobStore
+    FileSystem fs = FileSystem.getLocal(conf);
+    if (!reStart) {
+      fs.delete(TEST_DIR, true);
+    }
+    conf.set(JTConfig.JT_PERSIST_JOBSTATUS_DIR,
+        fs.makeQualified(TEST_DIR).toString());
+    conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, true);
+    conf.set(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, "1");
+
+    mr =
+        new MiniMRCluster(0, 0, 0, "file:///", 1, null, null, MR_UGI, conf);
+  }
+
+  /**
+   * Kill the cluster after the test is done.
+   */
+  @After
+  public void tearDown() {
+    if (mr != null) {
+      mr.shutdown();
+    }
+  }
+
+  /**
+   * Test view-job-acl, modify-job-acl and acl persistence to the
+   * completed-jobs-store.
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  @Test
+  public void testACLS() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    verifyACLViewJob();
+    verifyACLModifyJob();
+    verifyACLPersistence();
+  }
+
+  /**
+   * Verify JobContext.JOB_ACL_VIEW_JOB
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void verifyACLViewJob() throws IOException, InterruptedException {
+
+    // Set the job up.
+    final Configuration myConf = mr.createJobConf();
+    myConf.set(JobContext.JOB_ACL_VIEW_JOB, "user1,user3");
+
+    // Submit the job as user1
+    Job job = submitJobAsUser(myConf, "user1");
+
+    final JobID jobId = job.getID();
+
+    // Try operations as an unauthorized user.
+    verifyViewJobAsUnauthorizedUser(myConf, jobId, "user2");
+
+    // Try operations as an authorized user.
+    verifyViewJobAsAuthorizedUser(myConf, jobId, "user3");
+
+    // Clean up the job
+    job.killJob();
+  }
+
+  private Job submitJobAsUser(final Configuration clusterConf, String user)
+      throws IOException, InterruptedException {
+    UserGroupInformation ugi =
+        UserGroupInformation.createUserForTesting(user, new String[] {});
+    Job job = (Job) ugi.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        SleepJob sleepJob = new SleepJob();
+        sleepJob.setConf(clusterConf);
+        Job myJob = sleepJob.createJob(0, 0, 1000, 1000, 1000, 1000);
+        myJob.submit();
+        return myJob;
+      }
+    });
+    return job;
+  }
+
+  private void verifyViewJobAsAuthorizedUser(final Configuration myConf,
+      final JobID jobId, String authorizedUser) throws IOException,
+      InterruptedException {
+    UserGroupInformation authorizedUGI =
+        UserGroupInformation.createUserForTesting(authorizedUser,
+            new String[] {});
+    authorizedUGI.doAs(new PrivilegedExceptionAction<Object>() {
+      @SuppressWarnings("null")
+      @Override
+      public Object run() throws Exception {
+        Job myJob = null;
+        try {
+          Cluster cluster = new Cluster(myConf);
+          myJob = cluster.getJob(jobId);
+        } catch (Exception e) {
+          fail("Exception .." + e);
+        }
+
+        assertNotNull("Job " + jobId + " is not known to the JobTracker!",
+            myJob);
+
+        // Tests authorization with getCounters
+        try {
+          myJob.getCounters();
+        } catch (IOException ioe) {
+          fail("Unexpected.. exception.. " + ioe);
+        }
+
+        // Tests authorization  with getTaskReports
+        try {
+          myJob.getTaskReports(TaskType.JOB_CLEANUP);
+        } catch (IOException ioe) {
+          fail("Unexpected.. exception.. " + ioe);
+        }
+
+        return null;
+      }
+    });
+  }
+
+  private void verifyViewJobAsUnauthorizedUser(final Configuration myConf,
+      final JobID jobId, String unauthorizedUser) throws IOException,
+      InterruptedException {
+    UserGroupInformation unauthorizedUGI =
+        UserGroupInformation.createUserForTesting(unauthorizedUser,
+            new String[] {});
+    unauthorizedUGI.doAs(new PrivilegedExceptionAction<Object>() {
+      @SuppressWarnings("null")
+      @Override
+      public Object run() {
+        Job myJob = null;
+        try {
+          Cluster cluster = new Cluster(myConf);
+          myJob = cluster.getJob(jobId);
+        } catch (Exception e) {
+          fail("Exception .." + e);
+        }
+
+        assertNotNull("Job " + jobId + " is not known to the JobTracker!",
+            myJob);
+
+        // Tests authorization failure with getCounters
+        try {
+          myJob.getCounters();
+          fail("AccessControlException expected..");
+        } catch (IOException ioe) {
+          assertTrue(ioe.getMessage().contains("AccessControlException"));
+        } catch (InterruptedException e) {
+          fail("Exception .. interrupted.." + e);
+        }
+
+        // Tests authorization failure with getTaskReports
+        try {
+          myJob.getTaskReports(TaskType.JOB_SETUP);
+          fail("AccessControlException expected..");
+        } catch (IOException ioe) {
+          assertTrue(ioe.getMessage().contains("AccessControlException"));
+        } catch (InterruptedException e) {
+          fail("Exception .. interrupted.." + e);
+        }
+
+        return null;
+      }
+    });
+  }
+
+  /**
+   * Verify JobContext.Job_ACL_MODIFY_JOB
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  private void verifyACLModifyJob() throws IOException,
+      InterruptedException, ClassNotFoundException {
+
+    // Set the job up.
+    final Configuration myConf = mr.createJobConf();
+    myConf.set(JobContext.JOB_ACL_MODIFY_JOB, "user1,user3");
+
+    // Submit the job as user1
+    Job job = submitJobAsUser(myConf, "user1");
+
+    final JobID jobId = job.getID();
+
+    // Try operations as an unauthorized user.
+    verifyModifyJobAsUnauthorizedUser(myConf, jobId, "user2");
+
+    // Try operations as an authorized user.
+    verifyModifyJobAsAuthorizedUser(myConf, jobId, "user3");
+  }
+
+  private void verifyModifyJobAsAuthorizedUser(
+      final Configuration clusterConf, final JobID jobId,
+      String authorizedUser) throws IOException, InterruptedException {
+    UserGroupInformation authorizedUGI =
+        UserGroupInformation.createUserForTesting(authorizedUser,
+            new String[] {});
+    authorizedUGI.doAs(new PrivilegedExceptionAction<Object>() {
+      @SuppressWarnings("null")
+      @Override
+      public Object run() throws Exception {
+        Job myJob = null;
+        try {
+          Cluster cluster = new Cluster(clusterConf);
+          myJob = cluster.getJob(jobId);
+        } catch (Exception e) {
+          fail("Exception .." + e);
+        }
+
+        assertNotNull("Job " + jobId + " is not known to the JobTracker!",
+            myJob);
+
+        // Test authorization success with setJobPriority
+        try {
+          myJob.setPriority(JobPriority.HIGH);
+          assertEquals(myJob.getPriority(), JobPriority.HIGH);
+        } catch (IOException ioe) {
+          fail("Unexpected.. exception.. " + ioe);
+        }
+
+        // Test authorization success with killJob
+        try {
+          myJob.killJob();
+        } catch (IOException ioe) {
+          fail("Unexpected.. exception.. " + ioe);
+        }
+
+        return null;
+      }
+    });
+  }
+
+  private void verifyModifyJobAsUnauthorizedUser(
+      final Configuration clusterConf, final JobID jobId,
+      String unauthorizedUser) throws IOException, InterruptedException {
+    UserGroupInformation unauthorizedUGI =
+        UserGroupInformation.createUserForTesting(unauthorizedUser,
+            new String[] {});
+    unauthorizedUGI.doAs(new PrivilegedExceptionAction<Object>() {
+      @SuppressWarnings("null")
+      @Override
+      public Object run() {
+        Job myJob = null;
+        try {
+          Cluster cluster = new Cluster(clusterConf);
+          myJob = cluster.getJob(jobId);
+        } catch (Exception e) {
+          fail("Exception .." + e);
+        }
+
+        assertNotNull("Job " + jobId + " is not known to the JobTracker!",
+            myJob);
+
+        // Tests authorization failure with killJob
+        try {
+          myJob.killJob();
+          fail("AccessControlException expected..");
+        } catch (IOException ioe) {
+          assertTrue(ioe.getMessage().contains("AccessControlException"));
+        } catch (InterruptedException e) {
+          fail("Exception .. interrupted.." + e);
+        }
+
+        // Tests authorization failure with setJobPriority
+        try {
+          myJob.setPriority(JobPriority.HIGH);
+          fail("AccessControlException expected..");
+        } catch (IOException ioe) {
+          assertTrue(ioe.getMessage().contains("AccessControlException"));
+        } catch (InterruptedException e) {
+          fail("Exception .. interrupted.." + e);
+        }
+
+        return null;
+      }
+    });
+  }
+
+  private void verifyACLPersistence() throws IOException,
+      InterruptedException {
+
+    // Set the job up.
+    final Configuration myConf = mr.createJobConf();
+    myConf.set(JobContext.JOB_ACL_VIEW_JOB, "user1,user2");
+
+    // Submit the job as user1
+    Job job = submitJobAsUser(myConf, "user1");
+
+    final JobID jobId = job.getID();
+
+    // Kill the job and wait till it is actually killed so that it is written to
+    // CompletedJobStore
+    job.killJob();
+    while (job.getJobState() != JobStatus.State.KILLED) {
+      LOG.info("Waiting for the job to be killed successfully..");
+      Thread.sleep(200);
+    }
+
+    // Now kill the cluster, so that the job is 'forgotten'
+    tearDown();
+
+    // Re-start the cluster
+    startCluster(true);
+
+    final Configuration myNewJobConf = mr.createJobConf();
+    // Now verify view-job works off CompletedJobStore
+    verifyViewJobAsAuthorizedUser(myNewJobConf, jobId, "user2");
+
+    // Only JobCounters is persisted on the JobStore. So test counters only.
+    UserGroupInformation unauthorizedUGI =
+        UserGroupInformation.createUserForTesting("user3", new String[] {});
+    unauthorizedUGI.doAs(new PrivilegedExceptionAction<Object>() {
+      @SuppressWarnings("null")
+      @Override
+      public Object run() {
+        Job myJob = null;
+        try {
+          Cluster cluster = new Cluster(myNewJobConf);
+          myJob = cluster.getJob(jobId);
+        } catch (Exception e) {
+          fail("Exception .." + e);
+        }
+
+        assertNotNull("Job " + jobId + " is not known to the JobTracker!",
+            myJob);
+
+        // Tests authorization failure with getCounters
+        try {
+          myJob.getCounters();
+          fail("AccessControlException expected..");
+        } catch (IOException ioe) {
+          assertTrue(ioe.getMessage().contains("AccessControlException"));
+        } catch (InterruptedException e) {
+          fail("Exception .. interrupted.." + e);
+        }
+
+        return null;
+      }
+    });
+
+  }
+}