You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2010/03/02 14:49:54 UTC

svn commit: r918037 [1/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/server/jobtracker/ src...

Author: vinodkv
Date: Tue Mar  2 13:49:52 2010
New Revision: 918037

URL: http://svn.apache.org/viewvc?rev=918037&view=rev
Log:
MAPREDUCE-1455. Authorization for servlets. Contributed by Ravi Gummadi.


Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerJobACLsManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerJobACLsManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java
    hadoop/mapreduce/trunk/src/webapps/job/job_authorization_error.jsp
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JSPUtil.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskGraphServlet.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
    hadoop/mapreduce/trunk/src/webapps/job/jobblacklistedtrackers.jsp
    hadoop/mapreduce/trunk/src/webapps/job/jobconf.jsp
    hadoop/mapreduce/trunk/src/webapps/job/jobdetails.jsp
    hadoop/mapreduce/trunk/src/webapps/job/jobfailures.jsp
    hadoop/mapreduce/trunk/src/webapps/job/jobqueue_details.jsp
    hadoop/mapreduce/trunk/src/webapps/job/jobtable.jsp
    hadoop/mapreduce/trunk/src/webapps/job/jobtasks.jsp
    hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp
    hadoop/mapreduce/trunk/src/webapps/job/taskdetails.jsp
    hadoop/mapreduce/trunk/src/webapps/job/taskstats.jsp

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Mar  2 13:49:52 2010
@@ -63,6 +63,9 @@
     MAPREDUCE-1430. JobTracker automatically renews delegation tokens for jobs.
     (Boris Shkolnik via ddas)
 
+    MAPREDUCE-1455. Introduces job-level authorization for mapreduce servlets.
+    (Ravi Gummadi via vinodkv)
+
   IMPROVEMENTS
 
     MAPREDUCE-1198. Alternatively schedule different types of tasks in

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java Tue Mar  2 13:49:52 2010
@@ -81,7 +81,7 @@
     // If the request has a set* param, handle that and redirect to the regular
     // view page so that the user won't resubmit the data if they hit refresh.
     boolean advancedView = request.getParameter("advanced") != null;
-    if (JSPUtil.privateActionsAllowed()
+    if (JSPUtil.privateActionsAllowed(jobTracker.conf)
         && request.getParameter("setPool") != null) {
       Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();
       PoolManager poolMgr = null;
@@ -102,7 +102,7 @@
       response.sendRedirect("/scheduler" + (advancedView ? "?advanced" : ""));
       return;
     }
-    if (JSPUtil.privateActionsAllowed()
+    if (JSPUtil.privateActionsAllowed(jobTracker.conf)
         && request.getParameter("setPriority") != null) {
       Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();      
       JobPriority priority = JobPriority.valueOf(request.getParameter(
@@ -261,7 +261,7 @@
               profile.getJobID(), profile.getJobID());
           out.printf("<td>%s</td>\n", profile.getUser());
           out.printf("<td>%s</td>\n", profile.getJobName());
-          if (JSPUtil.privateActionsAllowed()) {
+          if (JSPUtil.privateActionsAllowed(jobTracker.conf)) {
             out.printf("<td>%s</td>\n", generateSelect(scheduler
                 .getPoolManager().getPoolNames(), scheduler.getPoolManager()
                 .getPoolName(job), "/scheduler?setPool=<CHOICE>&jobid="

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JSPUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JSPUtil.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JSPUtil.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JSPUtil.java Tue Mar  2 13:49:52 2010
@@ -19,12 +19,15 @@
 
 import java.io.IOException;
 import java.net.URLEncoder;
+import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import javax.servlet.RequestDispatcher;
+import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.servlet.jsp.JspWriter;
@@ -33,15 +36,21 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ServletUtil;
 import org.apache.hadoop.util.StringUtils;
 
 class JSPUtil {
-  private static final String PRIVATE_ACTIONS_KEY = "webinterface.private.actions";
+  static final String PRIVATE_ACTIONS_KEY = "webinterface.private.actions";
   
+  // This conf is not from jobtracker, not from tasktracker. So may not
+  // contain PRIVATE_ACTIONS_KEY set to true even if we set in conf object used
+  // by jobtracker. So use this conf with caution.
   public static final Configuration conf = new Configuration();
 
   //LRU based cache
@@ -52,6 +61,94 @@
     conf.getInt(JTConfig.JT_JOBHISTORY_CACHE_SIZE, 5);
 
   private static final Log LOG = LogFactory.getLog(JSPUtil.class);
+
+  /**
+   * Wraps the {@link JobInProgress} object and contains boolean for
+   * 'job view access' allowed or not.
+   * This class is only for usage by JSPs and Servlets.
+   */
+  static class JobWithViewAccessCheck {
+    private JobInProgress job = null;
+    
+    // true if user is authorized to view this job
+    private boolean isViewAllowed = true;
+
+    JobWithViewAccessCheck(JobInProgress job) {
+      this.job = job;
+    }
+
+    JobInProgress getJob() {
+      return job;
+    }
+
+    boolean isViewJobAllowed() {
+      return isViewAllowed;
+    }
+
+    void setViewAccess(boolean isViewAllowed) {
+      this.isViewAllowed = isViewAllowed;
+    }
+  }
+
+  /**
+   * Validates if current user can view the job.
+   * If user is not authorized to view the job, this method will modify the
+   * response and forwards to an error page and returns Job with
+   * viewJobAccess flag set to false.
+   * @return JobWithViewAccessCheck object(contains JobInProgress object and
+   *         viewJobAccess flag). Callers of this method will check the flag
+   *         and decide if view should be allowed or not. Job will be null if
+   *         the job with given jobid doesnot exist at the JobTracker.
+   */
+  public static JobWithViewAccessCheck checkAccessAndGetJob(JobTracker jt,
+      JobID jobid, HttpServletRequest request, HttpServletResponse response)
+      throws ServletException, IOException {
+    final JobInProgress job = jt.getJob(jobid);
+    JobWithViewAccessCheck myJob = new JobWithViewAccessCheck(job);
+
+    String user = request.getRemoteUser();
+    if (user != null && job != null && jt.isJobLevelAuthorizationEnabled()) {
+      final UserGroupInformation ugi =
+        UserGroupInformation.createRemoteUser(user);
+      try {
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+          public Void run() throws IOException, ServletException {
+
+            // checks job view permission
+            job.checkAccess(ugi, JobACL.VIEW_JOB);
+            return null;
+          }
+        });
+      } catch (AccessControlException e) {
+        String errMsg = "User " + ugi.getShortUserName() +
+            " failed to view " + jobid + "!<br><br>" + e.getMessage() +
+            "<hr><a href=\"jobtracker.jsp\">Go back to JobTracker</a><br>";
+        JSPUtil.setErrorAndForward(errMsg, request, response);
+        myJob.setViewAccess(false);
+      } catch (InterruptedException e) {
+        String errMsg = " Interrupted while trying to access " + jobid +
+        "<hr><a href=\"jobtracker.jsp\">Go back to JobTracker</a><br>";
+        JSPUtil.setErrorAndForward(errMsg, request, response);
+        myJob.setViewAccess(false);
+      }
+    }
+    return myJob;
+  }
+
+  /**
+   * Sets error code SC_UNAUTHORIZED in response and forwards to
+   * error page which contains error message and a back link.
+   */
+  public static void setErrorAndForward(String errMsg,
+      HttpServletRequest request, HttpServletResponse response)
+      throws ServletException, IOException {
+    request.setAttribute("error.msg", errMsg);
+    RequestDispatcher dispatcher = request.getRequestDispatcher(
+        "/job_authorization_error.jsp");
+    response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
+    dispatcher.forward(request, response);
+  }
+
   /**
    * Method used to process the request from the job page based on the 
    * request which it has received. For example like changing priority.
@@ -60,32 +157,102 @@
    * @param response HTTP response object.
    * @param tracker {@link JobTracker} instance
    * @throws IOException
+   * @throws InterruptedException
+   * @throws ServletException
+   * @return if user is authorized to perform the operation on all the
+   *         selected jobs or not
    */
-  public static void processButtons(HttpServletRequest request,
-      HttpServletResponse response, JobTracker tracker) throws IOException {
-
-    if (privateActionsAllowed() && request.getParameter("killJobs") != null) {
+  public static boolean processButtons(HttpServletRequest request,
+      HttpServletResponse response, final JobTracker tracker)
+      throws IOException, InterruptedException, ServletException {
+
+    String user = request.getRemoteUser();
+    if (privateActionsAllowed(tracker.conf) &&
+        request.getParameter("killJobs") != null) {
       String[] jobs = request.getParameterValues("jobCheckBox");
       if (jobs != null) {
+        boolean notAuthorized = false;
+        String errMsg = "User " + user
+            + " failed to kill the following job(s)!<br><br>";
         for (String job : jobs) {
-          tracker.killJob(JobID.forName(job));
+          final JobID jobId = JobID.forName(job);
+          if (user != null) {
+            UserGroupInformation ugi =
+              UserGroupInformation.createRemoteUser(user);
+            try {
+              ugi.doAs(new PrivilegedExceptionAction<Void>() {
+                public Void run() throws IOException{
+
+                  tracker.killJob(jobId);// checks job modify permission
+                  return null;
+                }
+              });
+            } catch(AccessControlException e) {
+              errMsg = errMsg.concat("<br>" + e.getMessage());
+              notAuthorized = true;
+              // We don't return right away so that we can try killing other
+              // jobs that are requested to be killed.
+              continue;
+            }
+          }
+          else {// no authorization needed
+            tracker.killJob(jobId);
+          }
+        }
+        if (notAuthorized) {// user is not authorized to kill some/all of jobs
+          errMsg = errMsg.concat(
+            "<br><hr><a href=\"jobtracker.jsp\">Go back to JobTracker</a><br>");
+          setErrorAndForward(errMsg, request, response);
+          return false;
         }
       }
     }
 
-    if (privateActionsAllowed()
-        && request.getParameter("changeJobPriority") != null) {
+    if (privateActionsAllowed(tracker.conf) &&
+        request.getParameter("changeJobPriority") != null) {
       String[] jobs = request.getParameterValues("jobCheckBox");
-
       if (jobs != null) {
-        JobPriority jobPri = JobPriority.valueOf(request
+        final JobPriority jobPri = JobPriority.valueOf(request
             .getParameter("setJobPriority"));
+        boolean notAuthorized = false;
+        String errMsg = "User " + user
+            + " failed to set priority for the following job(s)!<br><br>";
 
         for (String job : jobs) {
-          tracker.setJobPriority(JobID.forName(job), jobPri);
+          final JobID jobId = JobID.forName(job);
+          if (user != null) {
+            UserGroupInformation ugi = UserGroupInformation.
+                createRemoteUser(user);
+            try {
+              ugi.doAs(new PrivilegedExceptionAction<Void>() {
+                public Void run() throws IOException{
+
+                  // checks job modify permission
+                  tracker.setJobPriority(jobId, jobPri);
+                  return null;
+                }
+              });
+            } catch(AccessControlException e) {
+              errMsg = errMsg.concat("<br>" + e.getMessage());
+              notAuthorized = true;
+              // We don't return right away so that we can try operating on
+              // other jobs.
+              continue;
+            }
+          }
+          else {// no authorization needed
+            tracker.setJobPriority(jobId, jobPri);
+          }
+        }
+        if (notAuthorized) {// user is not authorized to kill some/all of jobs
+          errMsg = errMsg.concat(
+            "<br><hr><a href=\"jobtracker.jsp\">Go back to JobTracker</a><br>");
+          setErrorAndForward(errMsg, request, response);
+          return false;
         }
       }
     }
+    return true;
   }
 
   /**
@@ -99,9 +266,10 @@
    * @throws IOException
    */
   public static String generateJobTable(String label, Collection<JobInProgress> jobs
-      , int refresh, int rowId) throws IOException {
+      , int refresh, int rowId, JobConf conf) throws IOException {
 
-    boolean isModifiable = label.equals("Running") && privateActionsAllowed();
+    boolean isModifiable = label.equals("Running") &&
+        privateActionsAllowed(conf);
     StringBuffer sb = new StringBuffer();
     
     sb.append("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n");
@@ -297,7 +465,7 @@
     }
   }
 
-  static final boolean privateActionsAllowed() {
+  static boolean privateActionsAllowed(JobConf conf) {
     return conf.getBoolean(PRIVATE_ACTIONS_KEY, false);
   }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java Tue Mar  2 13:49:52 2010
@@ -20,6 +20,8 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.security.AccessControlException;
@@ -27,20 +29,21 @@
 import org.apache.hadoop.security.authorize.AccessControlList;
 
 @InterfaceAudience.Private
-public class JobACLsManager {
+public abstract class JobACLsManager {
 
-  private JobTracker jobTracker = null;
+  static final Log LOG = LogFactory.getLog(JobACLsManager.class);
 
-  public JobACLsManager(JobTracker tracker) {
-    jobTracker = tracker;
-  }
+  protected abstract boolean isJobLevelAuthorizationEnabled();
+
+  protected abstract boolean isSuperUserOrSuperGroup(
+      UserGroupInformation callerUGI);
 
   /**
    * Construct the jobACLs from the configuration so that they can be kept in
    * the memory. If authorization is disabled on the JT, nothing is constructed
    * and an empty map is returned.
    * 
-   * @return JobACl to AccessControlList map.
+   * @return JobACL to AccessControlList map.
    */
   Map<JobACL, AccessControlList> constructJobACLs(JobConf conf) {
     
@@ -48,7 +51,7 @@
       new HashMap<JobACL, AccessControlList>();
 
     // Don't construct anything if authorization is disabled.
-    if (!jobTracker.isJobLevelAuthorizationEnabled()) {
+    if (!isJobLevelAuthorizationEnabled()) {
       return acls;
     }
 
@@ -66,13 +69,12 @@
   }
 
   /**
-   * If authorization is enabled on the JobTracker, checks whether the user (in
-   * the callerUGI) is authorized to perform the operation specify by
-   * 'jobOperation' on the job.
+   * If authorization is enabled, checks whether the user (in the callerUGI) is
+   * authorized to perform the operation specified by 'jobOperation' on the job.
    * <ul>
    * <li>The owner of the job can do any operation on the job</li>
-   * <li>The superuser/supergroup of the JobTracker is always permitted to do
-   * operations on any job.</li>
+   * <li>The superuser/supergroup is always permitted to do operations on any
+   * job.</li>
    * <li>For all other users/groups job-acls are checked</li>
    * </ul>
    * 
@@ -83,40 +85,65 @@
   void checkAccess(JobStatus jobStatus, UserGroupInformation callerUGI,
       JobACL jobOperation) throws AccessControlException {
 
-    if (!jobTracker.isJobLevelAuthorizationEnabled()) {
+    JobID jobId = jobStatus.getJobID();
+    String jobOwner = jobStatus.getUsername();
+    AccessControlList acl = jobStatus.getJobACLs().get(jobOperation);
+    checkAccess(jobId, callerUGI, jobOperation, jobOwner, acl);
+  }
+
+  /**
+   * If authorization is enabled, checks whether the user (in the callerUGI) is
+   * authorized to perform the operation specified by 'jobOperation' on the job.
+   * <ul>
+   * <li>The owner of the job can do any operation on the job</li>
+   * <li>The superuser/supergroup is always permitted to do operations on any
+   * job.</li>
+   * <li>For all other users/groups job-acls are checked</li>
+   * </ul>
+   * @param jobId
+   * @param callerUGI
+   * @param jobOperation
+   * @param jobOwner
+   * @param jobACL
+   * @throws AccessControlException
+   */
+  void checkAccess(JobID jobId, UserGroupInformation callerUGI,
+      JobACL jobOperation, String jobOwner, AccessControlList jobACL)
+      throws AccessControlException {
+
+    if (!isJobLevelAuthorizationEnabled()) {
       return;
     }
 
-    JobID jobId = jobStatus.getJobID();
-
     // Check for superusers/supergroups
-    if (jobTracker.isSuperUserOrSuperGroup(callerUGI)) {
-      JobInProgress.LOG.info("superuser/supergroup "
+    if (isSuperUserOrSuperGroup(callerUGI)) {
+      LOG.info("superuser/supergroupMember "
           + callerUGI.getShortUserName() + " trying to perform "
           + jobOperation.toString() + " on " + jobId);
       return;
     }
 
     // Job-owner is always part of all the ACLs
-    if (callerUGI.getShortUserName().equals(jobStatus.getUsername())) {
-      JobInProgress.LOG.info("Jobowner " + callerUGI.getShortUserName()
+    if (callerUGI.getShortUserName().equals(jobOwner)) {
+      LOG.info("Jobowner " + callerUGI.getShortUserName()
           + " trying to perform " + jobOperation.toString() + " on "
           + jobId);
       return;
     }
 
-    AccessControlList acl = jobStatus.getJobACLs().get(jobOperation);
-    if (acl.isUserAllowed(callerUGI)) {
-      JobInProgress.LOG.info("Normal user " + callerUGI.getShortUserName()
+    
+    if (jobACL.isUserAllowed(callerUGI)) {
+      LOG.info("Normal user " + callerUGI.getShortUserName()
           + " trying to perform " + jobOperation.toString() + " on "
           + jobId);
       return;
     }
 
     throw new AccessControlException(callerUGI
-        + " not authorized for performing the operation "
+        + " is not authorized for performing the operation "
         + jobOperation.toString() + " on " + jobId + ". "
-        + jobOperation.toString() + " configured for this job : "
-        + acl.toString());
+        + jobOperation.toString()
+        + " Access control list configured for this job : "
+        + jobACL.toString());
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Tue Mar  2 13:49:52 2010
@@ -797,7 +797,7 @@
   }
 
   static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
-    return (baseUrl + "/tasklog?plaintext=true&taskid=" + taskId); 
+    return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
   }
   
   static Configuration getConfiguration(String jobTrackerSpec)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Mar  2 13:49:52 2010
@@ -1107,7 +1107,7 @@
         this.jobtracker.getTaskTracker(tip.machineWhereTaskRan(taskid));
       TaskTrackerStatus ttStatus = 
         (taskTracker == null) ? null : taskTracker.getStatus();
-      String httpTaskLogLocation = null; 
+      String taskTrackerHttpLocation = null; 
 
       if (null != ttStatus){
         String host;
@@ -1116,8 +1116,8 @@
         } else {
           host = ttStatus.getHost();
         }
-        httpTaskLogLocation = "http://" + host + ":" + ttStatus.getHttpPort(); 
-           //+ "/tasklog?plaintext=true&taskid=" + status.getTaskID();
+        taskTrackerHttpLocation = "http://" + host + ":"
+            + ttStatus.getHttpPort(); 
       }
 
       TaskCompletionEvent taskEvent = null;
@@ -1130,7 +1130,7 @@
                                             !tip.isJobCleanupTask() &&
                                             !tip.isJobSetupTask(),
                                             TaskCompletionEvent.Status.SUCCEEDED,
-                                            httpTaskLogLocation 
+                                            taskTrackerHttpLocation 
                                            );
         taskEvent.setTaskRunTime((int)(status.getFinishTime() 
                                        - status.getStartTime()));
@@ -1186,7 +1186,7 @@
                                             !tip.isJobCleanupTask() &&
                                             !tip.isJobSetupTask(),
                                             taskCompletionStatus, 
-                                            httpTaskLogLocation
+                                            taskTrackerHttpLocation
                                            );
       }          
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Mar  2 13:49:52 2010
@@ -1253,7 +1253,7 @@
                                                 "expireLaunchingTasks");
 
   final CompletedJobStatusStore completedJobStatusStore;
-  private JobACLsManager jobACLsManager;
+  private JobTrackerJobACLsManager jobACLsManager;
   Thread completedJobsStoreThread = null;
   final RecoveryManager recoveryManager;
 
@@ -1351,7 +1351,7 @@
       mrOwner = UserGroupInformation.getCurrentUser();
     }
     
-    supergroup = conf.get(JT_SUPERGROUP, "supergroup");
+    supergroup = conf.get(MR_SUPERGROUP, "supergroup");
     LOG.info("Starting jobtracker with owner as " + mrOwner.getShortUserName() 
              + " and supergroup as " + supergroup);
     clock = newClock;
@@ -1590,7 +1590,7 @@
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
     // Initialize the jobACLSManager
-    jobACLsManager = new JobACLsManager(this);
+    jobACLsManager = new JobTrackerJobACLsManager(this);
     //initializes the job status store
     completedJobStatusStore = new CompletedJobStatusStore(jobACLsManager, conf);
   }
@@ -4035,13 +4035,14 @@
    * Is the calling user a super user? Or part of the supergroup?
    * @return true, if it is a super user
    */
-  boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI) {
-    if (mrOwner.getShortUserName().equals(callerUGI.getShortUserName())) {
+  static boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI,
+      UserGroupInformation superUser, String superGroup) {
+    if (superUser.getShortUserName().equals(callerUGI.getShortUserName())) {
       return true;
     }
     String[] groups = callerUGI.getGroupNames();
     for(int i=0; i < groups.length; ++i) {
-      if (groups[i].equals(supergroup)) {
+      if (groups[i].equals(superGroup)) {
         return true;
       }
     }
@@ -4054,7 +4055,8 @@
    */
   public synchronized void refreshNodes() throws IOException {
     // check access
-    if (!isSuperUserOrSuperGroup(UserGroupInformation.getCurrentUser())) {
+    if (!isSuperUserOrSuperGroup(UserGroupInformation.getCurrentUser(), mrOwner,
+                                 supergroup)) {
       String user = UserGroupInformation.getCurrentUser().getShortUserName();
       throw new AccessControlException(user + 
                                        " is not authorized to refresh nodes.");
@@ -4064,6 +4066,10 @@
     refreshHosts();
   }
   
+  UserGroupInformation getMROwner() {
+    return mrOwner;
+  }
+
   String getSuperGroup() {
     return supergroup;
   }
@@ -4489,7 +4495,7 @@
     } else {
       mrOwner = UserGroupInformation.getCurrentUser();
     }
-    supergroup = conf.get("mapred.permissions.supergroup", "supergroup");
+    supergroup = conf.get(MRConfig.MR_SUPERGROUP, "supergroup");
     
     secretManager = null;
     
@@ -4572,7 +4578,7 @@
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
     // Initialize the jobACLSManager
-    jobACLsManager = new JobACLsManager(this);
+    jobACLsManager = new JobTrackerJobACLsManager(this);
 
     //initializes the job status store
     completedJobStatusStore = new CompletedJobStatusStore(jobACLsManager, conf);

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerJobACLsManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerJobACLsManager.java?rev=918037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerJobACLsManager.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTrackerJobACLsManager.java Tue Mar  2 13:49:52 2010
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Manages the job ACLs and the operations on them at JobTracker.
+ *
+ */
+@InterfaceAudience.Private
+public class JobTrackerJobACLsManager extends JobACLsManager {
+
+  static final Log LOG = LogFactory.getLog(JobTrackerJobACLsManager.class);
+
+  private JobTracker jobTracker = null;
+
+  public JobTrackerJobACLsManager(JobTracker tracker) {
+    jobTracker = tracker;
+  }
+
+  @Override
+  protected boolean isJobLevelAuthorizationEnabled() {
+    return jobTracker.isJobLevelAuthorizationEnabled();
+  }
+
+  @Override
+  protected boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI) {
+    return JobTracker.isSuperUserOrSuperGroup(callerUGI,
+        jobTracker.getMROwner(), jobTracker.getSuperGroup());
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskGraphServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskGraphServlet.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskGraphServlet.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskGraphServlet.java Tue Mar  2 13:49:52 2010
@@ -19,12 +19,14 @@
 
 import java.io.IOException;
 import java.io.PrintWriter;
-
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.hadoop.mapred.JSPUtil.JobWithViewAccessCheck;
+import org.apache.hadoop.security.UserGroupInformation;
+
 /** The servlet that outputs svg graphics for map / reduce task
  *  statuses
  */
@@ -52,13 +54,20 @@
 
     response.setContentType("image/svg+xml");
 
-    JobTracker tracker = 
+    final JobTracker tracker = 
       (JobTracker) getServletContext().getAttribute("job.tracker");
     
     String jobIdStr = request.getParameter("jobid");
     if(jobIdStr == null)
       return;
-    JobID jobId = JobID.forName(jobIdStr);
+    final JobID jobId = JobID.forName(jobIdStr);
+
+    // verify if user has view access for this job
+    JobWithViewAccessCheck myJob = JSPUtil.checkAccessAndGetJob(
+        tracker, jobId, request, response);
+    if (!myJob.isViewJobAllowed()) {
+      return;// user is not authorized to view this job
+    }
 
     final boolean isMap = "map".equalsIgnoreCase(request.getParameter("type"));
     final TaskReport[] reports = isMap? tracker.getMapTaskReports(jobId) 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Mar  2 13:49:52 2010
@@ -21,7 +21,6 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java Tue Mar  2 13:49:52 2010
@@ -22,11 +22,19 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -50,7 +58,7 @@
   public static String getTaskLogUrl(String taskTrackerHostName,
       String httpPort, String taskAttemptID) {
     return ("http://" + taskTrackerHostName + ":" + httpPort
-        + "/tasklog?taskid=" + taskAttemptID);
+        + "/tasklog?attemptid=" + taskAttemptID);
   }
 
   /**
@@ -150,6 +158,42 @@
   }
 
   /**
+   * Validates if the given user has job view permissions for this job.
+   * conf contains jobOwner and job-view-ACLs.
+   * We allow jobOwner, superUser(i.e. mrOwner) and members of superGroup and
+   * users and groups specified in configuration using
+   * mapreduce.job.acl-view-job to view job.
+   */
+  private void checkAccessForTaskLogs(JobConf conf, String user, JobID jobId,
+      TaskTracker tracker) throws AccessControlException {
+
+    if (!tracker.isJobLevelAuthorizationEnabled()) {
+      return;
+    }
+
+    // buiild job view acl by reading from conf
+    AccessControlList jobViewACL = tracker.getJobACLsManager().
+        constructJobACLs(conf).get(JobACL.VIEW_JOB);
+
+    String jobOwner = conf.get(JobContext.USER_NAME);
+    UserGroupInformation callerUGI = UserGroupInformation.createRemoteUser(user);
+
+    tracker.getJobACLsManager().checkAccess(jobId, callerUGI, JobACL.VIEW_JOB,
+        jobOwner, jobViewACL);
+  }
+
+  /**
+   * Builds a Configuration object by reading the xml file.
+   * This doesn't load the default resources.
+   */
+  static Configuration getConfFromJobACLsFile(String attemptIdStr) {
+    Configuration conf = new Configuration(false);
+    conf.addResource(new Path(TaskLog.getBaseDir(attemptIdStr).toString(),
+        TaskRunner.jobACLsFile));
+    return conf;
+  }
+
+  /**
    * Get the logs via http.
    */
   @Override
@@ -162,13 +206,35 @@
     TaskLog.LogName filter = null;
     boolean isCleanup = false;
 
-    String taskIdStr = request.getParameter("taskid");
-    if (taskIdStr == null) {
+    String attemptIdStr = request.getParameter("attemptid");
+    if (attemptIdStr == null) {
       response.sendError(HttpServletResponse.SC_BAD_REQUEST, 
-                         "Argument taskid is required");
+                         "Argument attemptid is required");
       return;
     }
-    TaskAttemptID taskId = TaskAttemptID.forName(taskIdStr);
+
+    TaskAttemptID attemptId = TaskAttemptID.forName(attemptIdStr);
+
+    // get user name who is accessing
+    String user = request.getRemoteUser();
+    if (user != null) {
+      // get jobACLConf from ACLs file
+      JobConf jobACLConf = new JobConf(getConfFromJobACLsFile(attemptIdStr));
+      ServletContext context = getServletContext();
+      TaskTracker taskTracker = (TaskTracker) context.getAttribute(
+          "task.tracker");
+      JobID jobId = attemptId.getJobID();
+
+      try {
+        checkAccessForTaskLogs(jobACLConf, user, jobId, taskTracker);
+      } catch (AccessControlException e) {
+        String errMsg = "User " + user + " failed to view tasklogs of job " +
+            jobId + "!\n\n" + e.getMessage();
+        response.sendError(HttpServletResponse.SC_UNAUTHORIZED, errMsg);
+        return;
+      }
+    }
+
     String logFilter = request.getParameter("filter");
     if (logFilter != null) {
       try {
@@ -204,27 +270,27 @@
     OutputStream out = response.getOutputStream();
     if( !plainText ) {
       out.write(("<html>\n" +
-                 "<title>Task Logs: '" + taskId + "'</title>\n" +
+                 "<title>Task Logs: '" + attemptId + "'</title>\n" +
                  "<body>\n" +
-                 "<h1>Task Logs: '" +  taskId +  "'</h1><br>\n").getBytes()); 
+                 "<h1>Task Logs: '" +  attemptId +  "'</h1><br>\n").getBytes()); 
 
       if (filter == null) {
-        printTaskLog(response, out, taskId, start, end, plainText, 
+        printTaskLog(response, out, attemptId, start, end, plainText, 
                      TaskLog.LogName.STDOUT, isCleanup);
-        printTaskLog(response, out, taskId, start, end, plainText, 
+        printTaskLog(response, out, attemptId, start, end, plainText, 
                      TaskLog.LogName.STDERR, isCleanup);
-        printTaskLog(response, out, taskId, start, end, plainText, 
+        printTaskLog(response, out, attemptId, start, end, plainText, 
                      TaskLog.LogName.SYSLOG, isCleanup);
-        if (haveTaskLog(taskId, TaskLog.LogName.DEBUGOUT)) {
-          printTaskLog(response, out, taskId, start, end, plainText, 
+        if (haveTaskLog(attemptId, TaskLog.LogName.DEBUGOUT)) {
+          printTaskLog(response, out, attemptId, start, end, plainText, 
                        TaskLog.LogName.DEBUGOUT, isCleanup);
         }
-        if (haveTaskLog(taskId, TaskLog.LogName.PROFILE)) {
-          printTaskLog(response, out, taskId, start, end, plainText, 
+        if (haveTaskLog(attemptId, TaskLog.LogName.PROFILE)) {
+          printTaskLog(response, out, attemptId, start, end, plainText, 
                        TaskLog.LogName.PROFILE, isCleanup);
         }
       } else {
-        printTaskLog(response, out, taskId, start, end, plainText, filter,
+        printTaskLog(response, out, attemptId, start, end, plainText, filter,
                      isCleanup);
       }
       
@@ -234,7 +300,7 @@
       response.sendError(HttpServletResponse.SC_BAD_REQUEST,
           "You must supply a value for `filter' (STDOUT, STDERR, or SYSLOG) if you set plainText = true");
     } else {
-      printTaskLog(response, out, taskId, start, end, plainText, filter, 
+      printTaskLog(response, out, attemptId, start, end, plainText, filter, 
                    isCleanup);
     } 
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Mar  2 13:49:52 2010
@@ -19,6 +19,7 @@
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
@@ -33,6 +34,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
@@ -45,7 +47,6 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.TaskController.InitializationContext;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
@@ -81,6 +82,8 @@
    */
   protected MapOutputFile mapOutputFile;
 
+  static String jobACLsFile = "job-acl.xml";
+
   public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker, 
       JobConf conf) {
     this.tip = tip;
@@ -279,8 +282,9 @@
    * 
    * @param taskid
    * @return an array of files. The first file is stdout, the second is stderr.
+   * @throws IOException 
    */
-  static File[] prepareLogFiles(TaskAttemptID taskid) {
+  File[] prepareLogFiles(TaskAttemptID taskid) throws IOException {
     File[] logFiles = new File[2];
     logFiles[0] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
     logFiles[1] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
@@ -292,9 +296,34 @@
       Localizer.PermissionsHandler.setPermissions(logDir,
           Localizer.PermissionsHandler.sevenZeroZero);
     }
+    // write job acls into a file to know the access for task logs
+    writeJobACLs(logDir);
     return logFiles;
   }
 
+  // Writes job-view-acls and user name into an xml file
+  private void writeJobACLs(File logDir) throws IOException {
+    File aclFile = new File(logDir, TaskRunner.jobACLsFile);
+    Configuration aclConf = new Configuration(false);
+
+    // set the job view acls in aclConf
+    String jobViewACLs = conf.get(JobContext.JOB_ACL_VIEW_JOB);
+    if (jobViewACLs != null) {
+      aclConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
+    }
+    // set jobOwner as mapreduce.job.user.name in aclConf
+    String jobOwner = conf.getUser();
+    aclConf.set(JobContext.USER_NAME, jobOwner);
+    FileOutputStream out = new FileOutputStream(aclFile);
+    try {
+      aclConf.writeXml(out);
+    } finally {
+      out.close();
+    }
+    Localizer.PermissionsHandler.setPermissions(aclFile,
+        Localizer.PermissionsHandler.sevenZeroZero);
+  }
+
   /**
    * Write the child's configuration to the disk and set it in configuration so
    * that the child can pick it up from there.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Mar  2 13:49:52 2010
@@ -241,6 +241,10 @@
   private int maxMapSlots;
   private int maxReduceSlots;
   private int failures;
+
+  // MROwner's ugi
+  private UserGroupInformation mrOwner;
+  private String supergroup;
   
   // Performance-related config knob to send an out-of-band heartbeat
   // on task completion
@@ -263,6 +267,9 @@
   private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
   private ResourceCalculatorPlugin resourceCalculatorPlugin = null;
 
+  // Manages job acls of jobs in TaskTracker
+  private TaskTrackerJobACLsManager jobACLsManager;
+
   /**
    * the minimum interval between jobtracker polls
    */
@@ -562,18 +569,22 @@
    */
   synchronized void initialize() throws IOException, InterruptedException {
     String keytabFilename = fConf.get(TTConfig.TT_KEYTAB_FILE);
-    UserGroupInformation ttUgi;
     UserGroupInformation.setConfiguration(fConf);
     if (keytabFilename != null) {
       String desiredUser = fConf.get(TTConfig.TT_USER_NAME,
                                     System.getProperty("user.name"));
       UserGroupInformation.loginUserFromKeytab(desiredUser, 
                                                keytabFilename);
-      ttUgi = UserGroupInformation.getLoginUser();
+      mrOwner = UserGroupInformation.getLoginUser();
       
     } else {
-      ttUgi = UserGroupInformation.getCurrentUser();
+      mrOwner = UserGroupInformation.getCurrentUser();
     }
+
+    supergroup = fConf.get(MRConfig.MR_SUPERGROUP, "supergroup");
+    LOG.info("Starting tasktracker with owner as " + mrOwner.getShortUserName()
+             + " and supergroup as " + supergroup);
+
     localFs = FileSystem.getLocal(fConf);
     // use configured nameserver & interface to get local hostname
     if (fConf.get(TT_HOST_NAME) != null) {
@@ -671,7 +682,7 @@
         asyncDiskService);
 
     this.jobClient = (InterTrackerProtocol) 
-    ttUgi.doAs(new PrivilegedExceptionAction<Object>() {
+    mrOwner.doAs(new PrivilegedExceptionAction<Object>() {
       public Object run() throws IOException {
         return RPC.waitForProxy(InterTrackerProtocol.class,
             InterTrackerProtocol.versionID, 
@@ -715,6 +726,22 @@
       fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
   }
 
+  UserGroupInformation getMROwner() {
+    return mrOwner;
+  }
+
+  String getSuperGroup() {
+    return supergroup;
+  }
+  
+  /**
+   * Is job level authorization enabled on the TT ?
+   */
+  boolean isJobLevelAuthorizationEnabled() {
+    return fConf.getBoolean(
+        MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, false);
+  }
+
   public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(
     Configuration conf) {
     return conf.getClass(TT_INSTRUMENTATION,
@@ -1232,10 +1259,13 @@
     server.setAttribute("localDirAllocator", localDirAllocator);
     server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
     server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
-    server.addInternalServlet("taskLog", "/tasklog", TaskLogServlet.class);
+    server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
     server.start();
     this.httpPort = server.getPort();
     checkJettyPort(httpPort);
+    
+    // Initialize the jobACLSManager
+    jobACLsManager = new TaskTrackerJobACLsManager(this);
     initialize();
   }
 
@@ -3869,4 +3899,7 @@
       return localJobTokenFileStr;
     }
 
+    TaskTrackerJobACLsManager getJobACLsManager() {
+      return jobACLsManager;
+    }
 }

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerJobACLsManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerJobACLsManager.java?rev=918037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerJobACLsManager.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerJobACLsManager.java Tue Mar  2 13:49:52 2010
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Manages the job ACLs and the operations on them at TaskTracker.
+ *
+ */
+@InterfaceAudience.Private
+public class TaskTrackerJobACLsManager extends JobACLsManager {
+
+  static final Log LOG = LogFactory.getLog(TaskTrackerJobACLsManager.class);
+
+  private TaskTracker taskTracker = null;
+
+  public TaskTrackerJobACLsManager(TaskTracker tracker) {
+    taskTracker = tracker;
+  }
+
+  @Override
+  protected boolean isJobLevelAuthorizationEnabled() {
+    return taskTracker.isJobLevelAuthorizationEnabled();
+  }
+
+  @Override
+  protected boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI) {
+    return JobTracker.isSuperUserOrSuperGroup(callerUGI,
+        taskTracker.getMROwner(), taskTracker.getSuperGroup());
+  }
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java Tue Mar  2 13:49:52 2010
@@ -1176,7 +1176,7 @@
   }
   
   private String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
-    return (baseUrl + "/tasklog?plaintext=true&taskid=" + taskId); 
+    return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
   }
 
   /** The interval at which monitorAndPrintJob() prints status */

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java Tue Mar  2 13:49:52 2010
@@ -37,6 +37,8 @@
     "mapreduce.cluster.reducememory.mb";
   public static final String JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG = 
     "mapreduce.cluster.job-authorization-enabled";
+  public static final String MR_SUPERGROUP =
+    "mapreduce.cluster.permissions.supergroup";
 
   //Delegation token related keys
   public static final String  DELEGATION_KEY_UPDATE_INTERVAL_KEY = 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java Tue Mar  2 13:49:52 2010
@@ -49,6 +49,11 @@
     "mapreduce.jobtracker.persist.jobstatus.hours";
   public static final String JT_PERSIST_JOBSTATUS_DIR = 
     "mapreduce.jobtracker.persist.jobstatus.dir";
+
+  /**
+   * @deprecated Use MR_SUPERGROUP instead
+   */
+  @Deprecated
   public static final String JT_SUPERGROUP = 
     "mapreduce.jobtracker.permissions.supergroup";
   public static final String JT_RETIREJOBS = 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java Tue Mar  2 13:49:52 2010
@@ -430,7 +430,7 @@
   }
 
   protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
-    return (baseUrl + "/tasklog?plaintext=true&taskid=" + taskId); 
+    return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
   }
   
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Tue Mar  2 13:49:52 2010
@@ -100,7 +100,9 @@
     Configuration.addDeprecation("mapred.job.tracker.persist.jobstatus.dir", 
       new String[] {JTConfig.JT_PERSIST_JOBSTATUS_DIR});
     Configuration.addDeprecation("mapred.permissions.supergroup", 
-      new String[] {JTConfig.JT_SUPERGROUP});
+      new String[] {MRConfig.MR_SUPERGROUP});
+    Configuration.addDeprecation("mapreduce.jobtracker.permissions.supergroup",
+        new String[] {MRConfig.MR_SUPERGROUP});
     Configuration.addDeprecation("mapred.task.cache.levels", 
       new String[] {JTConfig.JT_TASKCACHE_LEVELS});
     Configuration.addDeprecation("mapred.jobtracker.taskalloc.capacitypad", 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java Tue Mar  2 13:49:52 2010
@@ -82,7 +82,8 @@
 
       ConfigurableMiniMRCluster.setConfiguration(props);
       //noinspection deprecation
-      mrCluster = new ConfigurableMiniMRCluster(2, getFileSystem().getUri().toString(), 1);
+      mrCluster = new ConfigurableMiniMRCluster(2,
+          getFileSystem().getUri().toString(), 1, conf);
     }
   }
 
@@ -94,8 +95,9 @@
     }
 
     public ConfigurableMiniMRCluster(int numTaskTrackers, String namenode,
-                                     int numDir) throws Exception {
-      super(numTaskTrackers, namenode, numDir);
+                                     int numDir, JobConf conf)
+        throws Exception {
+      super(0,0, numTaskTrackers, namenode, numDir, null, null, null, conf);
     }
 
     public JobConf createJobConf() {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java Tue Mar  2 13:49:52 2010
@@ -42,6 +42,7 @@
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -215,7 +216,7 @@
     // start a cluster with 1 host and specified superuser and supergroup
     Configuration conf = new Configuration();
     // set the supergroup
-    conf.set(JTConfig.JT_SUPERGROUP, "abc");
+    conf.set(MRConfig.MR_SUPERGROUP, "abc");
     startCluster(2, 1, 0, UserGroupInformation.createRemoteUser("user1"), conf);
 
     conf = mr.createJobConf(new JobConf(conf));

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=918037&r1=918036&r2=918037&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Tue Mar  2 13:49:52 2010
@@ -32,6 +32,7 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -507,6 +508,11 @@
     localizedJobConf = tracker.localizeJobFiles(task, 
         new TaskTracker.RunningJob(task.getJobID()));
 
+    // Set job view ACLs in conf sothat validation of contents of jobACLsFile
+    // can be done against this value. Have both users and groups
+    String jobViewACLs = "user1,user2, group1,group2";
+    localizedJobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
+
     // Now initialize the job via task-controller so as to set
     // ownership/permissions of jars, job-work-dir
     JobInitializationContext jobContext = new JobInitializationContext();
@@ -546,7 +552,7 @@
     runner.setupChildTaskConfiguration(lDirAlloc);
     TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
         localizedJobConf);
-    attemptLogFiles = TaskRunner.prepareLogFiles(task.getTaskID());
+    attemptLogFiles = runner.prepareLogFiles(task.getTaskID());
 
     // Make sure the task-conf file is created
     Path localTaskFile =
@@ -616,6 +622,24 @@
         + expectedStderr.toString() + " Observed : "
         + attemptLogFiles[1].toString(), expectedStderr.toString().equals(
         attemptLogFiles[1].toString()));
+
+    // Make sure that the job ACLs file exists in the task log dir
+    File jobACLsFile = new File(logDir, TaskRunner.jobACLsFile);
+    assertTrue("JobACLsFile is missing in the task log dir " + logDir,
+        jobACLsFile.exists());
+
+    // With default task controller, the job-acls file is owned by TT and
+    // permissions are 700
+    checkFilePermissions(jobACLsFile.getAbsolutePath(), "-rwx------",
+        taskTrackerUGI.getShortUserName(), taskTrackerUGI.getGroupNames()[0]);
+
+    // Validate the contents of jobACLsFile(both user name and job-view-acls)
+    Configuration jobACLsConf =
+        TaskLogServlet.getConfFromJobACLsFile(task.getTaskID().toString());
+    assertTrue(jobACLsConf.get(JobContext.USER_NAME).equals(
+        localizedJobConf.getUser()));
+    assertTrue(jobACLsConf.get(JobContext.JOB_ACL_VIEW_JOB).
+        equals(localizedJobConf.get(JobContext.JOB_ACL_VIEW_JOB)));
   }
 
   /**
@@ -736,7 +760,7 @@
     runner.setupChildTaskConfiguration(lDirAlloc);
     TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
         localizedJobConf);
-    TaskRunner.prepareLogFiles(task.getTaskID());
+    runner.prepareLogFiles(task.getTaskID());
     Path localTaskFile =
         lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
             .getUser(), task.getJobID().toString(), task.getTaskID()

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java?rev=918037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestWebUIAuthorization.java Tue Mar  2 13:49:52 2010
@@ -0,0 +1,604 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URL;
+import java.net.HttpURLConnection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.http.TestHttpServer.DummyFilterInitializer;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class TestWebUIAuthorization extends ClusterMapReduceTestCase {
+
+  private static final Log LOG = LogFactory.getLog(
+      TestWebUIAuthorization.class);
+
+  // user1 submits the jobs
+  private static final String jobSubmitter = "user1";
+  // mrOwner starts the cluster
+  private static String mrOwner = null;
+  // member of supergroup
+  private static final String superGroupMember = "user2";
+  // "colleague1" is there in job-view-acls config
+  private static final String viewColleague = "colleague1";
+  // "colleague2" is there in job-modify-acls config
+  private static final String modifyColleague = "colleague2";
+  // "colleague3" is there in both job-view-acls and job-modify-acls
+  private static final String viewAndModifyColleague = "colleague3";
+  // "evilJohn" is not having view/modify access on the jobs
+  private static final String unauthorizedUser = "evilJohn";
+
+  protected void setUp() throws Exception {
+    // do not do anything
+  };
+
+  /** access a url, ignoring some IOException such as the page does not exist */
+  static int getHttpStatusCode(String urlstring, String userName,
+      String method) throws IOException {
+    LOG.info("Accessing " + urlstring + " as user " + userName);
+    URL url = new URL(urlstring + "&user.name=" + userName);
+    HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+    connection.setRequestMethod(method);
+    if (method.equals("POST")) {
+      String encodedData = "action=kill&user.name=" + userName;      
+      connection.setRequestProperty("Content-Type",
+                                    "application/x-www-form-urlencoded");
+      connection.setRequestProperty("Content-Length",
+                                    Integer.toString(encodedData.length()));
+      connection.setDoOutput(true);
+
+      OutputStream os = connection.getOutputStream();
+      os.write(encodedData.getBytes());
+    }
+    connection.connect();
+
+    return connection.getResponseCode();
+  }
+
+  public static class MyGroupsProvider extends ShellBasedUnixGroupsMapping {
+    static Map<String, List<String>> mapping = new HashMap<String, List<String>>();
+
+    @Override
+    public List<String> getGroups(String user) throws IOException {
+      return mapping.get(user);
+    }
+  }
+
+  /**
+   * Validates the given jsp/servlet against different user names who
+   * can(or cannot) view the job.
+   * (1) jobSubmitter can view the job
+   * (2) superGroupMember can view the job
+   * (3) user mentioned in job-view-acls should be able to view the job
+   * (4) user mentioned in job-modify-acls but not in job-view-acls
+   *     cannot view the job
+   * (5) other unauthorized users cannot view the job
+   */
+  private void validateViewJob(String url, String method) throws IOException {
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(url, jobSubmitter, method));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(url, superGroupMember, method));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(url, mrOwner, method));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(url, viewColleague, method));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(url, viewAndModifyColleague, method));
+    assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
+        getHttpStatusCode(url, modifyColleague, method));
+    assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
+        getHttpStatusCode(url, unauthorizedUser, method));
+  }
+
+  /**
+   * Validates the given jsp/servlet against different user names who
+   * can(or cannot) modify the job.
+   * (1) jobSubmitter and superGroupMember can modify the job. But we are not
+   *     validating this in this method. Let the caller explicitly validate
+   *     this, if needed.
+   * (2) user mentioned in job-view-acls but not in job-modify-acls cannot
+   *     modify the job
+   * (3) user mentioned in job-modify-acls (irrespective of job-view-acls)
+   *     can modify the job
+   * (4) other unauthorized users cannot modify the job
+   */
+  private void validateModifyJob(String url, String method) throws IOException {
+    assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
+        getHttpStatusCode(url, viewColleague, method));
+    assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
+        getHttpStatusCode(url, unauthorizedUser, method));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(url, modifyColleague, method));
+  }
+
+  // starts a sleep job with 1 map task that runs for a long time
+  private Job startSleepJobAsUser(String user, JobConf conf) throws Exception {
+	  final SleepJob sleepJob = new SleepJob();
+    sleepJob.setConf(conf);
+    UserGroupInformation jobSubmitterUGI = 
+    UserGroupInformation.createRemoteUser(user);
+    Job job = jobSubmitterUGI.doAs(new PrivilegedExceptionAction<Job>() {
+      public Job run() throws Exception {
+        // Very large sleep job.
+        Job job = sleepJob.createJob(1, 0, 900000, 1, 0, 0);
+        job.submit();
+        return job;
+      }
+    });
+    return job;
+  }
+
+  // Waits till the map task gets started and gets its tipId from map reports
+  // and returns the tipId
+  private TaskID getTIPId(MiniMRCluster cluster,
+      org.apache.hadoop.mapreduce.JobID jobid) throws Exception {
+    JobClient client = new JobClient(cluster.createJobConf());
+    JobID jobId = (JobID) jobid;
+    TaskReport[] mapReports = null;
+
+    TaskID tipId = null;
+    do { // make sure that the map task is running
+      Thread.sleep(200);
+      mapReports = client.getMapTaskReports(jobId);
+    } while (mapReports.length == 0);
+
+    for (TaskReport r : mapReports) {
+      tipId = r.getTaskID();
+      break;// because we have only one map
+    }
+    return tipId;
+  }
+
+  /**
+   * Make sure that the given user can do killJob using jobdetails.jsp url
+   * @param cluster
+   * @param conf
+   * @param jtURL
+   * @param jobTrackerJSP
+   * @param user
+   * @throws Exception
+   */
+  private void confirmJobDetailsJSPKillJobAsUser(MiniMRCluster cluster,
+      JobConf conf, String jtURL, String jobTrackerJSP, String user)
+      throws Exception {
+    Job job = startSleepJobAsUser(jobSubmitter, conf);
+    org.apache.hadoop.mapreduce.JobID jobid = job.getID();
+    getTIPId(cluster, jobid);// wait till the map task is started
+    // jobDetailsJSP killJob url
+    String url = jtURL + "/jobdetails.jsp?" +
+        "action=kill&jobid="+ jobid.toString();
+    try {
+      assertEquals(HttpURLConnection.HTTP_OK,
+          getHttpStatusCode(url, user, "POST"));
+    } finally {
+      if (!job.isComplete()) {
+        LOG.info("Killing job " + jobid + " from finally block");
+        assertEquals(HttpURLConnection.HTTP_OK,
+            getHttpStatusCode(jobTrackerJSP + "&killJobs=true&jobCheckBox=" +
+            jobid.toString(), jobSubmitter, "GET"));
+      }
+    }
+  }
+
+  /**
+   * Starts a sleep job and tries to kill the job using jobdetails.jsp as
+   * (1) viewColleague (2) unauthorizedUser (3) modifyColleague
+   * (4) viewAndModifyColleague (5) mrOwner (6) superGroupMember and
+   * (7) jobSubmitter
+   *
+   * Validates the given jsp/servlet against different user names who
+   * can(or cannot) do both view and modify on the job.
+   * (1) jobSubmitter, mrOwner and superGroupMember can do both view and modify
+   *     on the job. But we are not validating this in this method. Let the
+   *     caller explicitly validate this, if needed.
+   * (2) user mentioned in job-view-acls but not in job-modify-acls cannot
+   *     do this
+   * (3) user mentioned in job-modify-acls but not in job-view-acls cannot
+   *     do this
+   * (4) other unauthorized users cannot do this
+   *
+   * @throws Exception
+   */
+  private void validateJobDetailsJSPKillJob(MiniMRCluster cluster,
+      JobConf clusterConf, String jtURL) throws Exception {
+
+    JobConf conf = new JobConf(cluster.createJobConf());
+    conf.set(JobContext.JOB_ACL_VIEW_JOB, viewColleague + " group3");
+
+    // Let us add group1 and group3 to modify-job-acl. So modifyColleague and
+    // viewAndModifyColleague will be able to modify the job
+    conf.set(JobContext.JOB_ACL_MODIFY_JOB, " group1,group3");
+
+    String jobTrackerJSP =  jtURL + "/jobtracker.jsp?a=b";
+    Job job = startSleepJobAsUser(jobSubmitter, conf);
+    org.apache.hadoop.mapreduce.JobID jobid = job.getID();
+    getTIPId(cluster, jobid);// wait till the map task is started
+    // jobDetailsJSPKillJobAction url
+    String url = jtURL + "/jobdetails.jsp?" +
+        "action=kill&jobid="+ jobid.toString();
+    try {
+      assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
+          getHttpStatusCode(url, viewColleague, "POST"));
+      assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
+          getHttpStatusCode(url, unauthorizedUser, "POST"));
+      assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
+          getHttpStatusCode(url, modifyColleague, "POST"));
+      assertEquals(HttpURLConnection.HTTP_OK,
+          getHttpStatusCode(url, viewAndModifyColleague, "POST"));
+    } finally {
+      if (!job.isComplete()) {
+        LOG.info("Killing job " + jobid + " from finally block");
+        assertEquals(HttpURLConnection.HTTP_OK,
+            getHttpStatusCode(jobTrackerJSP + "&killJobs=true&jobCheckBox=" +
+            jobid.toString(), jobSubmitter, "GET"));
+      }
+    }
+
+    // check if jobSubmitter, mrOwner and superGroupMember can do killJob
+    // using jobdetails.jsp url
+    confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
+                                       jobSubmitter);
+    confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
+                                       mrOwner);
+    confirmJobDetailsJSPKillJobAsUser(cluster, conf, jtURL, jobTrackerJSP,
+                                       superGroupMember);
+  }
+
+  /**
+   * Make sure that the given user can do killJob using jobtracker.jsp url
+   * @param cluster
+   * @param conf
+   * @param jtURL
+   * @param user
+   * @throws Exception
+   */
+  private void confirmJobTrackerJSPKillJobAsUser(MiniMRCluster cluster,
+      JobConf conf, String jtURL, String user)
+      throws Exception {
+    String jobTrackerJSP =  jtURL + "/jobtracker.jsp?a=b";
+    Job job = startSleepJobAsUser(jobSubmitter, conf);
+    org.apache.hadoop.mapreduce.JobID jobid = job.getID();
+    getTIPId(cluster, jobid);// wait till the map task is started
+    // jobTrackerJSP killJob url
+    String url = jobTrackerJSP +
+        "&killJobs=true&jobCheckBox=" + jobid.toString();
+    try {
+      assertEquals(HttpURLConnection.HTTP_OK,
+          getHttpStatusCode(url, user, "POST"));
+    } finally {
+      if (!job.isComplete()) {
+        LOG.info("Killing job " + jobid + " from finally block");
+        assertEquals(HttpURLConnection.HTTP_OK,
+            getHttpStatusCode(jobTrackerJSP + "&killJobs=true&jobCheckBox=" +
+            jobid.toString(), jobSubmitter, "GET"));
+      }
+    }
+  }
+
+  /**
+   * Make sure that multiple jobs get killed using jobtracker.jsp url when
+   * user has modify access on only some of those jobs.
+   * @param cluster
+   * @param conf
+   * @param jtURL
+   * @param user
+   * @throws Exception
+   */
+  private void validateKillMultipleJobs(MiniMRCluster cluster,
+      JobConf conf, String jtURL) throws Exception {
+    String jobTrackerJSP =  jtURL + "/jobtracker.jsp?a=b";
+    // jobTrackerJSP killJob url
+    String url = jobTrackerJSP + "&killJobs=true";
+    // view-job-acl doesn't matter for killJob from jobtracker jsp page
+    conf.set(JobContext.JOB_ACL_VIEW_JOB, "");
+    
+    // Let us start jobs as 4 different users(none of these 4 users is
+    // mrOwner and none of these users is a member of superGroup). So only
+    // based on the config JobContext.JOB_ACL_MODIFY_JOB being set here,
+    // killJob on each of the jobs will be succeeded.
+
+    // start 1st job.
+    // Out of these 4 users, only jobSubmitter can do killJob on 1st job
+    conf.set(JobContext.JOB_ACL_MODIFY_JOB, "");
+    Job job1 = startSleepJobAsUser(jobSubmitter, conf);
+    org.apache.hadoop.mapreduce.JobID jobid = job1.getID();
+    getTIPId(cluster, jobid);// wait till the map task is started
+    url = url.concat("&jobCheckBox=" + jobid.toString());
+    // start 2nd job.
+    // Out of these 4 users, only viewColleague can do killJob on 2nd job
+    Job job2 = startSleepJobAsUser(viewColleague, conf);
+    jobid = job2.getID();
+    getTIPId(cluster, jobid);// wait till the map task is started
+    url = url.concat("&jobCheckBox=" + jobid.toString());
+    // start 3rd job.
+    // Out of these 4 users, only modifyColleague can do killJob on 3rd job
+    Job job3 = startSleepJobAsUser(modifyColleague, conf);
+    jobid = job3.getID();
+    getTIPId(cluster, jobid);// wait till the map task is started
+    url = url.concat("&jobCheckBox=" + jobid.toString());
+    // start 4rd job.
+    // Out of these 4 users, viewColleague and viewAndModifyColleague
+    // can do killJob on 4th job
+    conf.set(JobContext.JOB_ACL_MODIFY_JOB, viewColleague);
+    Job job4 = startSleepJobAsUser(viewAndModifyColleague, conf);
+    jobid = job4.getID();
+    getTIPId(cluster, jobid);// wait till the map task is started
+    url = url.concat("&jobCheckBox=" + jobid.toString());
+
+    try {
+      // Try killing all the 4 jobs as user viewColleague who can kill only
+      // 2nd and 4th jobs. Check if 1st and 3rd jobs are not killed and
+      // 2nd and 4th jobs got killed
+      assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED,
+          getHttpStatusCode(url, viewColleague, "POST"));
+      assertFalse("killJob succeeded for a job for which user doesnot "
+          + " have job-modify permission", job1.isComplete());
+      assertFalse("killJob succeeded for a job for which user doesnot "
+          + " have job-modify permission", job3.isComplete());
+      assertTrue("killJob failed for a job for which user has "
+          + "job-modify permission", job2.isComplete());
+      assertTrue("killJob failed for a job for which user has "
+          + "job-modify permission", job4.isComplete());
+    } finally {
+      // Kill all 4 jobs as user mrOwner(even though some of them
+      // were already killed)
+      assertEquals(HttpURLConnection.HTTP_OK,
+          getHttpStatusCode(url, mrOwner, "GET"));
+    }
+  }
+
+  /**
+   * Run a job and validate if JSPs/Servlets are going through authentication
+   * and authorization.
+   * @throws Exception 
+   */
+  @Test
+  public void testWebUIAuthorization() throws Exception {
+    JobConf conf = new JobConf();
+    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+        MyGroupsProvider.class.getName());
+    Groups.getUserToGroupsMappingService(conf);
+    Properties props = new Properties();
+    props.setProperty("hadoop.http.filter.initializers",
+        DummyFilterInitializer.class.getName());
+    props.setProperty(
+       MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, String.valueOf(true));
+    props.setProperty("dfs.permissions.enabled", "false");
+    
+    props.setProperty(JSPUtil.PRIVATE_ACTIONS_KEY, "true");
+    props.setProperty("mapreduce.job.committer.setup.cleanup.needed", "false");
+    props.setProperty(MRConfig.MR_SUPERGROUP, "superGroup");
+
+    MyGroupsProvider.mapping.put(jobSubmitter, Arrays.asList("group1"));
+    MyGroupsProvider.mapping.put(viewColleague, Arrays.asList("group2"));
+    MyGroupsProvider.mapping.put(modifyColleague, Arrays.asList("group1"));
+    MyGroupsProvider.mapping.put(unauthorizedUser, Arrays.asList("evilSociety"));
+    MyGroupsProvider.mapping.put(superGroupMember, Arrays.asList("superGroup"));
+    MyGroupsProvider.mapping.put(viewAndModifyColleague, Arrays.asList("group3"));
+
+    mrOwner = UserGroupInformation.getCurrentUser().getShortUserName();
+    MyGroupsProvider.mapping.put(mrOwner, Arrays.asList(
+        new String[] { "group4", "group5" }));
+
+    startCluster(true, props);
+    MiniMRCluster cluster = getMRCluster();
+    int infoPort = cluster.getJobTrackerRunner().getJobTrackerInfoPort();
+
+    JobConf clusterConf = cluster.createJobConf();
+    conf = new JobConf(clusterConf);
+    conf.set(JobContext.JOB_ACL_VIEW_JOB, viewColleague + " group3");
+
+    // Let us add group1 and group3 to modify-job-acl. So modifyColleague and
+    // viewAndModifyColleague will be able to modify the job
+    conf.set(JobContext.JOB_ACL_MODIFY_JOB, " group1,group3");
+    
+    Job job = startSleepJobAsUser(jobSubmitter, conf);
+
+    org.apache.hadoop.mapreduce.JobID jobid = job.getID();
+
+    String jtURL = "http://localhost:" + infoPort;
+
+    String jobTrackerJSP =  jtURL + "/jobtracker.jsp?a=b";
+    try {
+      // Currently, authorization is not done for jobtracker page. So allow
+      // everyone to view it.
+      validateJobTrackerJSPAccess(jtURL);
+      validateJobDetailsJSPAccess(jobid, jtURL);
+      validateTaskGraphServletAccess(jobid, jtURL);
+      validateJobTasksJSPAccess(jobid, jtURL);
+      validateJobConfJSPAccess(jobid, jtURL);
+      validateJobFailuresJSPAccess(jobid, jtURL);
+      valiateJobBlacklistedTrackerJSPAccess(jobid, jtURL);
+      validateJobTrackerJSPSetPriorityAction(jobid, jtURL);
+
+      // Wait for the tip to start so as to test task related JSP
+      TaskID tipId = getTIPId(cluster, jobid);
+      validateTaskStatsJSPAccess(jobid, jtURL, tipId);
+      validateTaskDetailsJSPAccess(jobid, jtURL, tipId);
+      validateJobTrackerJSPKillJobAction(jobid, jtURL);
+    } finally {
+      if (!job.isComplete()) { // kill the job(as jobSubmitter) if needed
+        LOG.info("Killing job " + jobid + " from finally block");
+        assertEquals(HttpURLConnection.HTTP_OK,
+            getHttpStatusCode(jobTrackerJSP + "&killJobs=true&jobCheckBox=" +
+            jobid.toString(), jobSubmitter, "GET"));
+      }
+    }
+
+    // validate killJob of jobdetails.jsp
+    validateJobDetailsJSPKillJob(cluster, clusterConf, jtURL);
+
+    // validate killJob of jobtracker.jsp as users viewAndModifyColleague,
+    // jobSubmitter, mrOwner and superGroupMember
+    confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL,
+        viewAndModifyColleague);
+    confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL, jobSubmitter);
+    confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL, mrOwner);
+    confirmJobTrackerJSPKillJobAsUser(cluster, conf, jtURL,
+        superGroupMember);
+
+    // validate killing of multiple jobs using jobtracker jsp and check
+    // if all the jobs which can be killed by user are actually the ones that
+    // got killed
+    validateKillMultipleJobs(cluster, conf, jtURL);
+  }
+
+  // validate killJob of jobtracker.jsp
+  private void validateJobTrackerJSPKillJobAction(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
+      throws IOException {
+    String jobTrackerJSP =  jtURL + "/jobtracker.jsp?a=b";
+    String jobTrackerJSPKillJobAction = jobTrackerJSP +
+        "&killJobs=true&jobCheckBox="+ jobid.toString();
+    validateModifyJob(jobTrackerJSPKillJobAction, "GET");
+  }
+
+  // validate viewing of job of taskdetails.jsp
+  private void validateTaskDetailsJSPAccess(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL, TaskID tipId)
+      throws IOException {
+    String taskDetailsJSP =  jtURL + "/taskdetails.jsp?jobid=" +
+        jobid.toString() + "&tipid=" + tipId;
+    validateViewJob(taskDetailsJSP, "GET");
+  }
+
+  // validate taskstats.jsp
+  private void validateTaskStatsJSPAccess(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL, TaskID tipId)
+      throws IOException {
+    String taskStatsJSP =  jtURL + "/taskstats.jsp?jobid=" +
+        jobid.toString() + "&tipid=" + tipId;
+    validateViewJob(taskStatsJSP, "GET");
+  }
+
+  // validate setJobPriority
+  private void validateJobTrackerJSPSetPriorityAction(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
+      throws IOException {
+    String jobTrackerJSP =  jtURL + "/jobtracker.jsp?a=b";
+    String jobTrackerJSPSetJobPriorityAction = jobTrackerJSP +
+        "&changeJobPriority=true&setJobPriority="+"HIGH"+"&jobCheckBox=" +
+        jobid.toString();
+    validateModifyJob(jobTrackerJSPSetJobPriorityAction, "GET");
+    // jobSubmitter, mrOwner and superGroupMember are not validated for
+    // job-modify permission in validateModifyJob(). So let us do it
+    // explicitly here
+    assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
+        jobTrackerJSPSetJobPriorityAction, jobSubmitter, "GET"));
+    assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
+        jobTrackerJSPSetJobPriorityAction, superGroupMember, "GET"));
+    assertEquals(HttpURLConnection.HTTP_OK, getHttpStatusCode(
+        jobTrackerJSPSetJobPriorityAction, mrOwner, "GET"));
+  }
+
+  // validate access of jobblacklistedtrackers.jsp
+  private void valiateJobBlacklistedTrackerJSPAccess(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
+      throws IOException {
+    String jobBlacklistedTrackersJSP =  jtURL +
+        "/jobblacklistedtrackers.jsp?jobid="+jobid.toString();
+    validateViewJob(jobBlacklistedTrackersJSP, "GET");
+  }
+
+  // validate access of jobfailures.jsp
+  private void validateJobFailuresJSPAccess(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
+      throws IOException {
+    String jobFailuresJSP =  jtURL + "/jobfailures.jsp?jobid="+jobid.toString();
+    validateViewJob(jobFailuresJSP, "GET");
+  }
+
+  // validate access of jobconf.jsp
+  private void validateJobConfJSPAccess(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
+      throws IOException {
+    String jobConfJSP =  jtURL + "/jobconf.jsp?jobid="+jobid.toString();
+    validateViewJob(jobConfJSP, "GET");
+  }
+
+  // validate access of jobtasks.jsp
+  private void validateJobTasksJSPAccess(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
+      throws IOException {
+    String jobTasksJSP =  jtURL + "/jobtasks.jsp?jobid="
+        + jobid.toString() + "&type=map&pagenum=1&state=running";
+    validateViewJob(jobTasksJSP, "GET");
+  }
+
+  // validate access of TaskGraphServlet
+  private void validateTaskGraphServletAccess(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
+      throws IOException {
+    String taskGraphServlet = jtURL + "/taskgraph?type=map&jobid="
+        + jobid.toString();
+    validateViewJob(taskGraphServlet, "GET");
+    taskGraphServlet = jtURL + "/taskgraph?type=reduce&jobid="
+        + jobid.toString();
+    validateViewJob(taskGraphServlet, "GET");
+  }
+
+  // validate access of jobdetails.jsp
+  private void validateJobDetailsJSPAccess(
+      org.apache.hadoop.mapreduce.JobID jobid, String jtURL)
+      throws IOException {
+    String jobDetailsJSP =  jtURL + "/jobdetails.jsp?jobid="
+        + jobid.toString();
+    validateViewJob(jobDetailsJSP, "GET");
+  }
+
+  // validate access of jobtracker.jsp
+  private void validateJobTrackerJSPAccess(String jtURL)
+      throws IOException {
+    String jobTrackerJSP =  jtURL + "/jobtracker.jsp?a=b";
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(jobTrackerJSP, jobSubmitter, "GET"));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(jobTrackerJSP, viewColleague, "GET"));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(jobTrackerJSP, unauthorizedUser, "GET"));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(jobTrackerJSP, modifyColleague, "GET"));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(jobTrackerJSP, viewAndModifyColleague, "GET"));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(jobTrackerJSP, mrOwner, "GET"));
+    assertEquals(HttpURLConnection.HTTP_OK,
+        getHttpStatusCode(jobTrackerJSP, superGroupMember, "GET"));
+  }
+}

Added: hadoop/mapreduce/trunk/src/webapps/job/job_authorization_error.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/job_authorization_error.jsp?rev=918037&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/webapps/job/job_authorization_error.jsp (added)
+++ hadoop/mapreduce/trunk/src/webapps/job/job_authorization_error.jsp Tue Mar  2 13:49:52 2010
@@ -0,0 +1,52 @@
+<%
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file 
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+%>
+<%@ page
+  contentType="text/html; charset=UTF-8"
+  import="javax.servlet.*"
+  import="javax.servlet.http.*"
+  import="java.io.*"
+  import="java.net.URL"
+  import="org.apache.hadoop.util.*"
+%>
+<%!	private static final long serialVersionUID = 1L;
+%>
+
+<html>
+<head>
+<title>Error: User cannot access this Job</title>
+</head>
+<body>
+<h2>Error: User cannot do this operation on this Job</h2><br>
+
+<%
+  String errorMsg = (String) request.getAttribute("error.msg");
+%>
+
+<font size="5"> 
+<%
+  out.println(errorMsg);
+%>
+</font>
+
+<hr>
+
+<%
+out.println(ServletUtil.htmlFooter());
+%>