You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/02/20 20:15:50 UTC
svn commit: r912196 - in /hadoop/mapreduce/trunk: ./ src/java/
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/
src/java/org/apache/hadoop/mapreduce/protocol/
src/java/org/apache/hadoop/mapreduce/tools/ src/test/mapred/org/apach...
Author: ddas
Date: Sat Feb 20 19:15:50 2010
New Revision: 912196
URL: http://svn.apache.org/viewvc?rev=912196&view=rev
Log:
MAPREDUCE-1307. Introduces the Job level ACLs feature. Contributed by Vinod Kumar Vavilapalli.
Added:
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobACL.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/mapred-default.xml
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobStatus.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sat Feb 20 19:15:50 2010
@@ -57,6 +57,9 @@
MAPREDUCE-1341. Sqoop should have an option to create hive tables and
skip the table import step. (Leonid Furman via tomwhite)
+ MAPREDUCE-1307. Introduces the Job level ACLs feature.
+ (Vinod Kumar Vavilapalli via ddas)
+
IMPROVEMENTS
MAPREDUCE-1198. Alternatively schedule different types of tasks in
Modified: hadoop/mapreduce/trunk/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/mapred-default.xml?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/trunk/src/java/mapred-default.xml Sat Feb 20 19:15:50 2010
@@ -931,6 +931,72 @@
</property>
<property>
+ <name>mapreduce.cluster.job-authorization-enabled</name>
+ <value>false</value>
+ <description> Boolean flag that specifies if job-level authorization checks
+ should be enabled on the jobs submitted to the cluster. Job-level
+ authorization is enabled if this flag is set to true or disabled otherwise.
+ It is disabled by default. If enabled, access control checks are made by
+ JobTracker and TaskTracker when requests are made by users for viewing the
+ job-details (See mapreduce.job.acl-view-job) or for modifying the job
+ (See mapreduce.job.acl-modify-job) using Map/Reduce APIs, RPCs or via the
+ console and web user interfaces.
+ </description>
+</property>
+
+<property>
+ <name>mapreduce.job.acl-modify-job</name>
+ <value></value>
+ <description> Job specific access-control list for 'modifying' the job. It
+ is only used if authorization is enabled in Map/Reduce by setting the
+ configuration property mapreduce.cluster.job-authorization-enabled to true.
+ This specifies the list of users and/or groups who can do modification
+ operations on the job. For specifying a list of users and groups the
+ format to use is "user1,user2 group1,group". If set to '*', it allows all
+ users/groups to modify this job. If set to '', it allows none. This
+ configuration is used to guard all the modifications with respect to this
+ job and takes care of all the following operations:
+ o killing this job
+ o killing a task of this job, failing a task of this job
+ o setting the priority of this job
+ Each of these operations are also protected by the per-queue level ACL
+ "acl-administer-jobs" configured via mapred-queues.xml. So a caller should
+ have the authorization to satisfy both the queue-level ACL and the
+ job-level ACL.
+
+ Irrespective of this ACL configuration, job-owner, superuser and members
+ of supergroup configured on JobTracker via mapred.permissions.supergroup,
+ can do all the modification operations.
+
+ By default, nobody else besides job-owner, superuser/supergroup can
+ perform modification operations on a job that they don't own.
+ </description>
+</property>
+
+<property>
+ <name>mapreduce.job.acl-view-job</name>
+ <value></value>
+ <description> Job specific access-control list for 'viewing' the job. It is
+ only used if authorization is enabled in Map/Reduce by setting the
+ configuration property mapreduce.cluster.job-authorization-enabled to true.
+ This specifies the list of users and/or groups who can view private details
+ about the job. For specifying a list of users and groups the
+ format to use is "user1,user2 group1,group". If set to '*', it allows all
+ users/groups to modify this job. If set to '', it allows none. This
+ configuration is used to guard some of the job-views and at present only
+ protects APIs that can return possibly sensitive information of the
+ job-owner like
+ o job-level counters
+ o task-level counters
+ o tasks' diagnostic information
+ o task-logs displayed on the TaskTracker web-UI and
+ o job.xml showed by the JobTracker's web-UI
+ Every other piece information of jobs is still accessible by any other
+ users, for e.g., JobStatus, JobProfile, list of jobs in the queue, etc.
+ </description>
+</property>
+
+<property>
<name>mapreduce.tasktracker.indexcache.mb</name>
<value>10</value>
<description> The maximum memory that a task tracker allows for the
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java Sat Feb 20 19:15:50 2010
@@ -27,7 +27,10 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.AccessControlException;
/**
* Persists and retrieves the Job info of a job into/from DFS.
@@ -46,13 +49,16 @@
private FileSystem fs;
private static final String JOB_INFO_STORE_DIR = "/jobtracker/jobsInfo";
+ private JobACLsManager jobACLsManager = null;
+
public static final Log LOG =
LogFactory.getLog(CompletedJobStatusStore.class);
private static long HOUR = 1000 * 60 * 60;
private static long SLEEP_TIME = 1 * HOUR;
- CompletedJobStatusStore(Configuration conf) throws IOException {
+ CompletedJobStatusStore(JobACLsManager aclsManager, Configuration conf)
+ throws IOException {
active =
conf.getBoolean(JTConfig.JT_PERSIST_JOBSTATUS, false);
@@ -68,13 +74,21 @@
// set the fs
this.fs = path.getFileSystem(conf);
if (!fs.exists(path)) {
- fs.mkdirs(path);
+ if (!fs.mkdirs(path)) {
+ active = false;
+ LOG.warn("Couldn't create " + jobInfoDir
+ + ". CompletedJobStore will be inactive.");
+ return;
+ }
}
if (retainTime == 0) {
// as retain time is zero, all stored jobstatuses are deleted.
deleteJobStatusDirs();
}
+
+ this.jobACLsManager = aclsManager;
+
LOG.info("Completed job store activated/configured with retain-time : "
+ retainTime + " , job-info-dir : " + jobInfoDir);
} else {
@@ -275,18 +289,23 @@
*
* @param jobId the jobId for which Counters is queried
* @return Counters object, null if not able to retrieve
+ * @throws AccessControlException
*/
- public Counters readCounters(JobID jobId) {
+ public Counters readCounters(JobID jobId) throws AccessControlException {
Counters counters = null;
if (active) {
try {
FSDataInputStream dataIn = getJobInfoFile(jobId);
if (dataIn != null) {
- readJobStatus(dataIn);
+ JobStatus jobStatus = readJobStatus(dataIn);
+ jobACLsManager.checkAccess(jobStatus,
+ UserGroupInformation.getCurrentUser(), JobACL.VIEW_JOB);
readJobProfile(dataIn);
counters = readCounters(dataIn);
dataIn.close();
}
+ } catch (AccessControlException ace) {
+ throw ace;
} catch (IOException ex) {
LOG.warn("Could not read [" + jobId + "] job counters : " + ex, ex);
}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java?rev=912196&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobACLsManager.java Sat Feb 20 19:15:50 2010
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
+@InterfaceAudience.Private
+public class JobACLsManager {
+
+ private JobTracker jobTracker = null;
+
+ public JobACLsManager(JobTracker tracker) {
+ jobTracker = tracker;
+ }
+
+ /**
+ * Construct the jobACLs from the configuration so that they can be kept in
+ * the memory. If authorization is disabled on the JT, nothing is constructed
+ * and an empty map is returned.
+ *
+ * @return JobACl to AccessControlList map.
+ */
+ Map<JobACL, AccessControlList> constructJobACLs(JobConf conf) {
+
+ Map<JobACL, AccessControlList> acls =
+ new HashMap<JobACL, AccessControlList>();
+
+ // Don't construct anything if authorization is disabled.
+ if (!jobTracker.isJobLevelAuthorizationEnabled()) {
+ return acls;
+ }
+
+ for (JobACL aclName : JobACL.values()) {
+ String aclConfigName = aclName.getAclName();
+ String aclConfigured = conf.get(aclConfigName);
+ if (aclConfigured == null) {
+ // If ACLs are not configured at all, we grant no access to anyone. So
+ // jobOwner and superuser/supergroup _only_ can do 'stuff'
+ aclConfigured = "";
+ }
+ acls.put(aclName, new AccessControlList(aclConfigured));
+ }
+ return acls;
+ }
+
+ /**
+ * If authorization is enabled on the JobTracker, checks whether the user (in
+ * the callerUGI) is authorized to perform the operation specify by
+ * 'jobOperation' on the job.
+ * <ul>
+ * <li>The owner of the job can do any operation on the job</li>
+ * <li>The superuser/supergroup of the JobTracker is always permitted to do
+ * operations on any job.</li>
+ * <li>For all other users/groups job-acls are checked</li>
+ * </ul>
+ *
+ * @param jobStatus
+ * @param callerUGI
+ * @param jobOperation
+ */
+ void checkAccess(JobStatus jobStatus, UserGroupInformation callerUGI,
+ JobACL jobOperation) throws AccessControlException {
+
+ if (!jobTracker.isJobLevelAuthorizationEnabled()) {
+ return;
+ }
+
+ JobID jobId = jobStatus.getJobID();
+
+ // Check for superusers/supergroups
+ if (jobTracker.isSuperUserOrSuperGroup(callerUGI)) {
+ JobInProgress.LOG.info("superuser/supergroup "
+ + callerUGI.getShortUserName() + " trying to perform "
+ + jobOperation.toString() + " on " + jobId);
+ return;
+ }
+
+ // Job-owner is always part of all the ACLs
+ if (callerUGI.getShortUserName().equals(jobStatus.getUsername())) {
+ JobInProgress.LOG.info("Jobowner " + callerUGI.getShortUserName()
+ + " trying to perform " + jobOperation.toString() + " on "
+ + jobId);
+ return;
+ }
+
+ AccessControlList acl = jobStatus.getJobACLs().get(jobOperation);
+ if (acl.isUserAllowed(callerUGI)) {
+ JobInProgress.LOG.info("Normal user " + callerUGI.getShortUserName()
+ + " trying to perform " + jobOperation.toString() + " on "
+ + jobId);
+ return;
+ }
+
+ throw new AccessControlException(callerUGI
+ + " not authorized for performing the operation "
+ + jobOperation.toString() + " on " + jobId + ". "
+ + jobOperation.toString() + " configured for this job : "
+ + acl.toString());
+ }
+}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sat Feb 20 19:15:50 2010
@@ -48,6 +48,7 @@
import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
@@ -72,15 +73,17 @@
import org.apache.hadoop.mapreduce.split.JobSplit;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
/*************************************************************
@@ -109,7 +112,6 @@
JobStatus status;
Path jobFile = null;
Path localJobFile = null;
- String user;
TaskInProgress maps[] = new TaskInProgress[0];
TaskInProgress reduces[] = new TaskInProgress[0];
@@ -228,6 +230,7 @@
LocalFileSystem localFs;
FileSystem fs;
+ String user;
JobID jobId;
private boolean hasSpeculativeMaps;
private boolean hasSpeculativeReduces;
@@ -425,6 +428,9 @@
JobContext jobContext = new JobContextImpl(conf, jobId);
this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
+ // Construct the jobACLs
+ status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
+
this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
@@ -712,6 +718,25 @@
}
/**
+ * If authorization is enabled on the JobTracker, checks whether the user (in
+ * the callerUGI) is authorized to perform the operation specify by
+ * 'jobOperation' on the job.
+ * <ul>
+ * <li>The owner of the job can do any operation on the job</li>
+ * <li>The superuser/supergroup of the JobTracker is always permitted to do
+ * operations on any job.</li>
+ * <li>For all other users/groups job-acls are checked</li>
+ * </ul>
+ *
+ * @param callerUGI
+ * @param jobOperation
+ */
+ void checkAccess(UserGroupInformation callerUGI, JobACL jobOperation)
+ throws AccessControlException {
+ jobtracker.getJobACLsManager().checkAccess(status, callerUGI, jobOperation);
+ }
+
+ /**
* If the number of taks is greater than the configured value
* throw an exception that will fail job initialization
*/
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java Sat Feb 20 19:15:50 2010
@@ -17,6 +17,11 @@
*/
package org.apache.hadoop.mapred;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
/**************************************************
* Describes the current status of a job. This is
* not intended to be a comprehensive piece of data.
@@ -281,7 +286,11 @@
protected synchronized void setSchedulingInfo(String schedulingInfo) {
super.setSchedulingInfo(schedulingInfo);
}
-
+
+ protected synchronized void setJobACLs(Map<JobACL, AccessControlList> acls) {
+ super.setJobACLs(acls);
+ }
+
/**
* Set the priority of the job, defaulting to NORMAL.
* @param jp new job priority
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Sat Feb 20 19:15:50 2010
@@ -71,6 +71,7 @@
import org.apache.hadoop.mapred.JobTrackerStatistics.TaskTrackerStat;
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
@@ -1252,6 +1253,7 @@
"expireLaunchingTasks");
final CompletedJobStatusStore completedJobStatusStore;
+ private JobACLsManager jobACLsManager;
Thread completedJobsStoreThread = null;
final RecoveryManager recoveryManager;
@@ -1587,8 +1589,10 @@
this.numTaskCacheLevels = conf.getInt(JT_TASKCACHE_LEVELS,
NetworkTopology.DEFAULT_HOST_LEVEL);
+ // Initialize the jobACLSManager
+ jobACLsManager = new JobACLsManager(this);
//initializes the job status store
- completedJobStatusStore = new CompletedJobStatusStore(conf);
+ completedJobStatusStore = new CompletedJobStatusStore(jobACLsManager, conf);
}
private static SimpleDateFormat getDateFormat() {
@@ -2991,11 +2995,11 @@
throw new IOException("Queue \"" + queue + "\" is not running");
}
try {
- checkAccess(job, Queue.QueueOperation.SUBMIT_JOB, ugi);
- } catch (IOException ioe) {
+ checkAccess(job, ugi, Queue.QueueOperation.SUBMIT_JOB, null);
+ } catch (AccessControlException ace) {
LOG.warn("Access denied for user " + job.getJobConf().getUser()
- + ". Ignoring job " + jobId, ioe);
- throw ioe;
+ + ". Ignoring job " + jobId, ace);
+ throw ace;
}
// Check the job if it cannot run in the cluster because of invalid memory
@@ -3046,30 +3050,56 @@
return job.getStatus();
}
- // Check whether the specified operation can be performed
- // related to the job.
- private void checkAccess(JobInProgress job,
- Queue.QueueOperation oper)
- throws IOException {
- // get the user group info
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- checkAccess(job, oper, ugi);
+ /**
+ * Is job-level authorization enabled on the JT?
+ *
+ * @return
+ */
+ boolean isJobLevelAuthorizationEnabled() {
+ return conf.getBoolean(
+ MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, false);
}
- // use the passed ugi for checking the access
- private void checkAccess(JobInProgress job, Queue.QueueOperation oper,
- UserGroupInformation ugi) throws IOException {
- // get the queue
+ /**
+ * Check the ACLs for a user doing the passed queue-operation and the passed
+ * job operation.
+ * <ul>
+ * <li>Superuser/supergroup can do any operation on the job</li>
+ * <li>For any other user/group, the configured ACLs for the corresponding
+ * queue and the job are checked.</li>
+ * </ul>
+ *
+ * @param job
+ * @param callerUGI
+ * @param oper
+ * @param jobOperation
+ * @throws AccessControlException
+ * @throws IOException
+ */
+ private void checkAccess(JobInProgress job,
+ UserGroupInformation callerUGI, Queue.QueueOperation oper,
+ JobACL jobOperation) throws AccessControlException {
+
+ // get the queue and verify the queue access
String queue = job.getProfile().getQueueName();
- if (!queueManager.hasAccess(queue, job, oper, ugi)) {
+ if (!queueManager.hasAccess(queue, job, oper, callerUGI)) {
throw new AccessControlException("User "
- + ugi.getShortUserName()
+ + callerUGI.getShortUserName()
+ " cannot perform "
+ "operation " + oper + " on queue " + queue +
".\n Please run \"hadoop queue -showacls\" " +
"command to find the queues you have access" +
" to .");
}
+
+ // check nulls, for e.g., submitJob RPC doesn't have a jobOperation as the
+ // job itself isn't created by that time.
+ if (jobOperation == null) {
+ return;
+ }
+
+ // check the access to the job
+ job.checkAccess(callerUGI, jobOperation);
}
/**@deprecated use {@link #getClusterStatus(boolean)}*/
@@ -3156,6 +3186,10 @@
return info;
}
+ /**
+ * @see ClientProtocol#killJob(org.apache.hadoop.mapreduce.JobID)
+ */
+ @Override
public synchronized void killJob(org.apache.hadoop.mapreduce.JobID jobid)
throws IOException {
killJob(JobID.downgrade(jobid));
@@ -3177,8 +3211,11 @@
LOG.info("killJob(): JobId " + jobid.toString() + " is not a valid job");
return;
}
-
- checkAccess(job, Queue.QueueOperation.ADMINISTER_JOBS);
+
+ // check both queue-level and job-level access
+ checkAccess(job, UserGroupInformation.getCurrentUser(),
+ Queue.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
+
killJob(job);
}
@@ -3288,10 +3325,9 @@
}
/**
- * Set the priority of a job
- * @param jobid id of the job
- * @param priority new priority of the job
+ * @see ClientProtocol#setJobPriority(org.apache.hadoop.mapreduce.JobID, String)
*/
+ @Override
public synchronized void setJobPriority(org.apache.hadoop.mapreduce.JobID
jobid, String priority) throws IOException {
setJobPriority(JobID.downgrade(jobid), priority);
@@ -3313,7 +3349,6 @@
+ " is not a valid job");
return;
}
- checkAccess(job, Queue.QueueOperation.ADMINISTER_JOBS);
JobPriority newPriority = JobPriority.valueOf(priority);
setJobPriority(jobid, newPriority);
}
@@ -3341,7 +3376,11 @@
}
return completedJobStatusStore.readJobProfile(jobid);
}
-
+
+ /**
+ * see {@link ClientProtocol#getJobStatus(org.apache.hadoop.mapreduce.JobID)}
+ */
+ @Override
public JobStatus getJobStatus(org.apache.hadoop.mapreduce.JobID jobid) {
return getJobStatus(JobID.downgrade(jobid));
}
@@ -3361,7 +3400,6 @@
if (job != null) {
return job.getStatus();
} else {
-
JobStatus status = retireJobs.get(jobid);
if (status != null) {
return status;
@@ -3370,10 +3408,38 @@
}
return completedJobStatusStore.readJobStatus(jobid);
}
-
+
+ /**
+ * see
+ * {@link ClientProtocol#getJobCounters(org.apache.hadoop.mapreduce.JobID)}
+ *
+ * @throws IOException
+ * @throws AccessControlException
+ */
+ @Override
public org.apache.hadoop.mapreduce.Counters getJobCounters(
- org.apache.hadoop.mapreduce.JobID jobid) {
- Counters counters = getJobCounters(JobID.downgrade(jobid));
+ org.apache.hadoop.mapreduce.JobID jobid)
+ throws AccessControlException, IOException {
+
+ JobID oldJobID = JobID.downgrade(jobid);
+
+ synchronized (this) {
+ JobInProgress job = jobs.get(oldJobID);
+ if (job != null) {
+
+ // check the job-access
+ job.checkAccess(UserGroupInformation.getCurrentUser(),
+ JobACL.VIEW_JOB);
+
+ Counters counters = job.getCounters();
+ if (counters != null) {
+ return new org.apache.hadoop.mapreduce.Counters(counters);
+ }
+ return null;
+ }
+ }
+
+ Counters counters = completedJobStatusStore.readCounters(oldJobID);
if (counters != null) {
return new org.apache.hadoop.mapreduce.Counters(counters);
}
@@ -3386,13 +3452,14 @@
*/
@Deprecated
public Counters getJobCounters(JobID jobid) {
- synchronized (this) {
- JobInProgress job = jobs.get(jobid);
- if (job != null) {
- return job.getCounters();
- }
- }
- return completedJobStatusStore.readCounters(jobid);
+ try {
+ return Counters.downgrade(
+ getJobCounters((org.apache.hadoop.mapreduce.JobID) jobid));
+ } catch (AccessControlException e) {
+ return null;
+ } catch (IOException e) {
+ return null;
+ }
}
/**
@@ -3514,8 +3581,26 @@
}
}
+ /**
+ * see
+ * {@link ClientProtocol#getTaskReports(org.apache.hadoop.mapreduce.JobID, TaskType)}
+ * @throws IOException
+ * @throws AccessControlException
+ */
+ @Override
public synchronized TaskReport[] getTaskReports(
- org.apache.hadoop.mapreduce.JobID jobid, TaskType type) {
+ org.apache.hadoop.mapreduce.JobID jobid, TaskType type)
+ throws AccessControlException, IOException {
+
+ // Check authorization
+ JobInProgress job = jobs.get(jobid);
+ if (job != null) {
+ job.checkAccess(UserGroupInformation.getCurrentUser(),
+ JobACL.VIEW_JOB);
+ } else {
+ return new TaskReport[0];
+ }
+
switch (type) {
case MAP :
return getMapTaskReports(JobID.downgrade(jobid));
@@ -3573,6 +3658,7 @@
throws IOException {
return getTaskDiagnostics(TaskAttemptID.downgrade(taskId));
}
+
/**
* Get the diagnostics for a given task
* @param taskId the id of the task
@@ -3586,6 +3672,11 @@
TaskID tipId = taskId.getTaskID();
JobInProgress job = jobs.get(jobId);
if (job != null) {
+
+ // check the access to the job.
+ job.checkAccess(UserGroupInformation.getCurrentUser(),
+ JobACL.VIEW_JOB);
+
TaskInProgress tip = job.getTaskInProgress(tipId);
if (tip != null) {
taskDiagnosticInfo = tip.getDiagnosticInfo(taskId);
@@ -3635,6 +3726,11 @@
return (job == null ? null : job.getTaskInProgress(tipid));
}
+ /**
+ * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#killTask(org.apache.hadoop.mapreduce.TaskAttemptID,
+ * boolean)
+ */
+ @Override
public synchronized boolean killTask(
org.apache.hadoop.mapreduce.TaskAttemptID taskid,
boolean shouldFail) throws IOException {
@@ -3647,7 +3743,11 @@
boolean shouldFail) throws IOException {
TaskInProgress tip = taskidToTIPMap.get(taskid);
if(tip != null) {
- checkAccess(tip.getJob(), Queue.QueueOperation.ADMINISTER_JOBS);
+
+ // check both queue-level and job-level access
+ checkAccess(tip.getJob(), UserGroupInformation.getCurrentUser(),
+ Queue.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
+
return tip.killTask(taskid, shouldFail);
}
else {
@@ -3668,7 +3768,10 @@
public JobStatus[] jobsToComplete() {
return getJobStatus(jobs.values(), true);
}
-
+
+ /**
+ * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getSystemDir()
+ */
public org.apache.hadoop.mapreduce.JobStatus[] getAllJobs() {
List<JobStatus> list = new ArrayList<JobStatus>();
list.addAll(Arrays.asList(getJobStatus(jobs.values(),false)));
@@ -3733,12 +3836,21 @@
/**
* Change the run-time priority of the given job.
+ *
* @param jobId job id
* @param priority new {@link JobPriority} for the job
+ * @throws IOException
+ * @throws AccessControlException
*/
- synchronized void setJobPriority(JobID jobId, JobPriority priority) {
+ synchronized void setJobPriority(JobID jobId, JobPriority priority)
+ throws AccessControlException, IOException {
JobInProgress job = jobs.get(jobId);
if (job != null) {
+
+ // check both queue-level and job-level access
+ checkAccess(job, UserGroupInformation.getCurrentUser(),
+ Queue.QueueOperation.ADMINISTER_JOBS, JobACL.MODIFY_JOB);
+
synchronized (taskScheduler) {
JobStatus oldStatus = (JobStatus)job.getStatus().clone();
job.setPriority(priority);
@@ -3920,16 +4032,14 @@
}
/**
- * Is the current user a super user?
+ * Is the calling user a super user? Or part of the supergroup?
* @return true, if it is a super user
- * @throws IOException if there are problems getting the current user
*/
- private synchronized boolean isSuperUser() throws IOException {
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- if (mrOwner.getShortUserName().equals(ugi.getShortUserName()) ) {
+ boolean isSuperUserOrSuperGroup(UserGroupInformation callerUGI) {
+ if (mrOwner.getShortUserName().equals(callerUGI.getShortUserName())) {
return true;
}
- String[] groups = ugi.getGroupNames();
+ String[] groups = callerUGI.getGroupNames();
for(int i=0; i < groups.length; ++i) {
if (groups[i].equals(supergroup)) {
return true;
@@ -3944,7 +4054,7 @@
*/
public synchronized void refreshNodes() throws IOException {
// check access
- if (!isSuperUser()) {
+ if (!isSuperUserOrSuperGroup(UserGroupInformation.getCurrentUser())) {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
throw new AccessControlException(user +
" is not authorized to refresh nodes.");
@@ -3954,6 +4064,10 @@
refreshHosts();
}
+ String getSuperGroup() {
+ return supergroup;
+ }
+
private synchronized void refreshHosts() throws IOException {
// Reread the config to get HOSTS and HOSTS_EXCLUDE filenames.
// Update the file names and refresh internal includes and excludes list
@@ -4457,8 +4571,11 @@
this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels",
NetworkTopology.DEFAULT_HOST_LEVEL);
+ // Initialize the jobACLSManager
+ jobACLsManager = new JobACLsManager(this);
+
//initializes the job status store
- completedJobStatusStore = new CompletedJobStatusStore(conf);
+ completedJobStatusStore = new CompletedJobStatusStore(jobACLsManager, conf);
}
/**
@@ -4510,4 +4627,8 @@
String user = UserGroupInformation.getCurrentUser().getUserName();
return secretManager.renewToken(token, user);
}
+
+ JobACLsManager getJobACLsManager() {
+ return jobACLsManager;
+ }
}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobACL.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobACL.java?rev=912196&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobACL.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobACL.java Sat Feb 20 19:15:50 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import org.apache.hadoop.classification.*;
+
+/**
+ * Job related ACLs
+ */
+@InterfaceAudience.Private
+public enum JobACL {
+
+ /**
+ * ACL for 'viewing' job. Dictates who can 'view' some or all of the job
+ * related details.
+ */
+ VIEW_JOB(JobContext.JOB_ACL_VIEW_JOB),
+
+ /**
+ * ACL for 'modifying' job. Dictates who can 'modify' the job for e.g., by
+ * killing the job, killing/failing a task of the job or setting priority of
+ * the job.
+ */
+ MODIFY_JOB(JobContext.JOB_ACL_MODIFY_JOB);
+
+ String aclName;
+
+ JobACL(String name) {
+ this.aclName = name;
+ }
+
+ /**
+ * Get the name of the ACL. Here it is same as the name of the configuration
+ * property for specifying the ACL for the job.
+ *
+ * @return aclName
+ */
+ public String getAclName() {
+ return aclName;
+ }
+}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Sat Feb 20 19:15:50 2010
@@ -229,7 +229,11 @@
"mapreduce.reduce.merge.memtomem.enabled";
public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";
public static final String JOB_JOBTRACKER_ID = "mapreduce.job.kerberos.jtprinicipal";
-
+
+ public static final String JOB_ACL_VIEW_JOB =
+ "mapreduce.job.acl-view-job";
+ public static final String JOB_ACL_MODIFY_JOB =
+ "mapreduce.job.acl-modify-job";
/**
* Return the configuration for the job.
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobStatus.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobStatus.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobStatus.java Sat Feb 20 19:15:50 2010
@@ -20,12 +20,16 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.authorize.AccessControlList;
/**************************************************
* Describes the current status of a job.
@@ -73,6 +77,9 @@
private JobPriority priority;
private String schedulingInfo="NA";
+ private Map<JobACL, AccessControlList> jobACLs =
+ new HashMap<JobACL, AccessControlList>();
+
private String jobName;
private String jobFile;
private long finishTime;
@@ -222,7 +229,11 @@
protected synchronized void setSchedulingInfo(String schedulingInfo) {
this.schedulingInfo = schedulingInfo;
}
-
+
+ protected synchronized void setJobACLs(Map<JobACL, AccessControlList> acls) {
+ this.jobACLs = acls;
+ }
+
/**
* @return Percentage of progress in maps
*/
@@ -281,6 +292,10 @@
return schedulingInfo;
}
+ public synchronized Map<JobACL, AccessControlList> getJobACLs() {
+ return jobACLs;
+ }
+
/**
* Return the priority of the job
* @return job priority
@@ -316,6 +331,13 @@
Text.writeString(out, jobName);
Text.writeString(out, trackingUrl);
Text.writeString(out, jobFile);
+
+ // Serialize the job's ACLs
+ out.writeInt(jobACLs.size());
+ for (Entry<JobACL, AccessControlList> entry : jobACLs.entrySet()) {
+ WritableUtils.writeEnum(out, entry.getKey());
+ Text.writeString(out, entry.getValue().toString());
+ }
}
public synchronized void readFields(DataInput in) throws IOException {
@@ -336,6 +358,14 @@
this.jobName = Text.readString(in);
this.trackingUrl = Text.readString(in);
this.jobFile = Text.readString(in);
+
+ // De-serialize the job's ACLs
+ int numACLs = in.readInt();
+ for (int i = 0; i < numACLs; i++) {
+ JobACL aclType = WritableUtils.readEnum(in, JobACL.class);
+ String acl = Text.readString(in);
+ this.jobACLs.put(aclType, new AccessControlList(acl));
+ }
}
/**
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MRConfig.java Sat Feb 20 19:15:50 2010
@@ -35,6 +35,8 @@
public static final String MAPMEMORY_MB = "mapreduce.cluster.mapmemory.mb";
public static final String REDUCEMEMORY_MB =
"mapreduce.cluster.reducememory.mb";
+ public static final String JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG =
+ "mapreduce.cluster.job-authorization-enabled";
//Delegation token related keys
public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY =
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Sat Feb 20 19:15:50 2010
@@ -101,9 +101,10 @@
* user home dir. JobTracker reads the required files from the
* staging area using user credentials passed via the rpc.
* Version 31: Added TokenStorage to submitJob
- * Version 32: Added delegation tokens (add, renew, cancel)
+ * Version 32: Added delegation tokens (add, renew, cancel)
+ * Version 33: Added JobACLs to JobStatus as part of MAPREDUCE-1307
*/
- public static final long versionID = 32L;
+ public static final long versionID = 33L;
/**
* Allocate a name for the job.
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java Sat Feb 20 19:15:50 2010
@@ -25,6 +25,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TIPStatus;
import org.apache.hadoop.mapreduce.Cluster;
@@ -39,6 +40,7 @@
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -218,9 +220,9 @@
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
+ Counters counters = job.getCounters();
System.out.println();
System.out.println(job);
- Counters counters = job.getCounters();
if (counters != null) {
System.out.println(counters);
} else {
@@ -307,6 +309,13 @@
exitCode = -1;
}
}
+ } catch (RemoteException re) {
+ IOException unwrappedException = re.unwrapRemoteException();
+ if (unwrappedException instanceof AccessControlException) {
+ System.out.println(unwrappedException.getMessage());
+ } else {
+ throw re;
+ }
} finally {
cluster.close();
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java Sat Feb 20 19:15:50 2010
@@ -37,6 +37,7 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.StaticMapping;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
/**
@@ -550,8 +551,12 @@
/**
* Change the job's priority
+ *
+ * @throws IOException
+ * @throws AccessControlException
*/
- public void setJobPriority(JobID jobId, JobPriority priority) {
+ public void setJobPriority(JobID jobId, JobPriority priority)
+ throws AccessControlException, IOException {
jobTracker.getJobTracker().setJobPriority(jobId, priority);
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java Sat Feb 20 19:15:50 2010
@@ -17,12 +17,14 @@
*/
package org.apache.hadoop.mapred;
+import java.io.File;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Properties;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -32,7 +34,12 @@
static final Path TEST_DIR =
new Path(System.getProperty("test.build.data","/tmp"),
"job-status-persistence");
-
+
+ @Override
+ protected void setUp() throws Exception {
+ // Don't start anything by default
+ }
+
private JobID runJob() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
Writer wr = new OutputStreamWriter(os);
@@ -65,6 +72,7 @@
}
public void testNonPersistency() throws Exception {
+ startCluster(true, null);
JobID jobId = runJob();
JobClient jc = new JobClient(createJobConf());
RunningJob rj = jc.getJob(jobId);
@@ -80,8 +88,7 @@
Properties config = new Properties();
config.setProperty(JTConfig.JT_PERSIST_JOBSTATUS, "true");
config.setProperty(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, "1");
- stopCluster();
- startCluster(false, config);
+ startCluster(true, config);
JobID jobId = runJob();
JobClient jc = new JobClient(createJobConf());
RunningJob rj0 = jc.getJob(jobId);
@@ -113,7 +120,7 @@
* Test if the completed job status is persisted to localfs.
*/
public void testLocalPersistency() throws Exception {
- FileSystem fs = FileSystem.getLocal(createJobConf());
+ FileSystem fs = FileSystem.getLocal(new JobConf());
fs.delete(TEST_DIR, true);
@@ -122,8 +129,7 @@
config.setProperty(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, "1");
config.setProperty(JTConfig.JT_PERSIST_JOBSTATUS_DIR,
fs.makeQualified(TEST_DIR).toString());
- stopCluster();
- startCluster(false, config);
+ startCluster(true, config);
JobID jobId = runJob();
JobClient jc = new JobClient(createJobConf());
RunningJob rj = jc.getJob(jobId);
@@ -134,4 +140,44 @@
assertTrue("Missing job info from the local fs", fs.exists(jobInfo));
fs.delete(TEST_DIR, true);
}
+
+ /**
+ * Verify that completed-job store is inactive if the jobinfo path is not
+ * writable.
+ *
+ * @throws Exception
+ */
+ public void testJobStoreDisablingWithInvalidPath() throws Exception {
+ MiniMRCluster mr = null;
+ Path parent = new Path(TEST_DIR, "parent");
+ try {
+ FileSystem fs = FileSystem.getLocal(new JobConf());
+
+ if (fs.exists(TEST_DIR) && !fs.delete(TEST_DIR, true)) {
+ fail("Cannot delete TEST_DIR!");
+ }
+
+ if (fs.mkdirs(new Path(TEST_DIR, parent))) {
+ if (!(new File(parent.toUri().getPath()).setWritable(false, false))) {
+ fail("Cannot chmod parent!");
+ }
+ } else {
+ fail("Cannot create parent dir!");
+ }
+ JobConf config = new JobConf();
+ config.set(JTConfig.JT_PERSIST_JOBSTATUS, "true");
+ config.set(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, "1");
+ config.set(JTConfig.JT_PERSIST_JOBSTATUS_DIR, new Path(parent,
+ "child").toUri().getPath());
+ mr = new MiniMRCluster(0, "file:///", 4, null, null, config);
+ assertFalse(
+ "CompletedJobStore is unexpectly active!",
+ mr.getJobTrackerRunner().getJobTracker().completedJobStatusStore.isActive());
+ } finally {
+ new File(parent.toUri().getPath()).setWritable(true, false);
+ if (mr != null) {
+ mr.shutdown();
+ }
+ }
+ }
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java?rev=912196&r1=912195&r2=912196&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java Sat Feb 20 19:15:50 2010
@@ -142,8 +142,18 @@
.downgrade(jobID));
tracker.initJob(jip);
try {
- tracker.killJob(jobID);
- fail("current user is neither u1 nor in the administer group list");
+ final Configuration userConf =
+ new Configuration(miniMRCluster.createJobConf());
+ UserGroupInformation ugi =
+ UserGroupInformation.createUserForTesting("someRandomUser",
+ new String[] { "someRandomGroup" });
+ cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
+ public Cluster run() throws IOException {
+ return new Cluster(userConf);
+ }
+ });
+ cluster.getJob(jobID).killJob();
+ fail("user 'someRandomeUser' is neither u1 nor in the administer group list");
} catch (Exception e) {
final Configuration userConf = new Configuration(miniMRCluster.createJobConf());
UserGroupInformation ugi =
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java?rev=912196&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestJobACLs.java Sat Feb 20 19:15:50 2010
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.After;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Verify the job-ACLs
+ *
+ */
+public class TestJobACLs {
+
+ static final Log LOG = LogFactory.getLog(TestJobACLs.class);
+
+ private MiniMRCluster mr = null;
+
+ private static final Path TEST_DIR =
+ new Path(System.getProperty("test.build.data", "/tmp"),
+ TestJobACLs.class.getCanonicalName() + Path.SEPARATOR
+ + "completed-job-store");
+
+ /**
+ * Start the cluster before running the actual test.
+ *
+ * @throws IOException
+ */
+ @Before
+ public void setup() throws IOException {
+ // Start the cluster
+ startCluster(false);
+ }
+
+ private void startCluster(boolean reStart) throws IOException {
+ UserGroupInformation MR_UGI = UserGroupInformation.getLoginUser();
+ JobConf conf = new JobConf();
+
+ // Enable job-level authorization
+ conf.setBoolean(MRConfig.JOB_LEVEL_AUTHORIZATION_ENABLING_FLAG, true);
+
+ // Enable CompletedJobStore
+ FileSystem fs = FileSystem.getLocal(conf);
+ if (!reStart) {
+ fs.delete(TEST_DIR, true);
+ }
+ conf.set(JTConfig.JT_PERSIST_JOBSTATUS_DIR,
+ fs.makeQualified(TEST_DIR).toString());
+ conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, true);
+ conf.set(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, "1");
+
+ mr =
+ new MiniMRCluster(0, 0, 0, "file:///", 1, null, null, MR_UGI, conf);
+ }
+
+ /**
+ * Kill the cluster after the test is done.
+ */
+ @After
+ public void tearDown() {
+ if (mr != null) {
+ mr.shutdown();
+ }
+ }
+
+ /**
+ * Test view-job-acl, modify-job-acl and acl persistence to the
+ * completed-jobs-store.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws ClassNotFoundException
+ */
+ @Test
+ public void testACLS() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ verifyACLViewJob();
+ verifyACLModifyJob();
+ verifyACLPersistence();
+ }
+
+ /**
+ * Verify JobContext.JOB_ACL_VIEW_JOB
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private void verifyACLViewJob() throws IOException, InterruptedException {
+
+ // Set the job up.
+ final Configuration myConf = mr.createJobConf();
+ myConf.set(JobContext.JOB_ACL_VIEW_JOB, "user1,user3");
+
+ // Submit the job as user1
+ Job job = submitJobAsUser(myConf, "user1");
+
+ final JobID jobId = job.getID();
+
+ // Try operations as an unauthorized user.
+ verifyViewJobAsUnauthorizedUser(myConf, jobId, "user2");
+
+ // Try operations as an authorized user.
+ verifyViewJobAsAuthorizedUser(myConf, jobId, "user3");
+
+ // Clean up the job
+ job.killJob();
+ }
+
+ private Job submitJobAsUser(final Configuration clusterConf, String user)
+ throws IOException, InterruptedException {
+ UserGroupInformation ugi =
+ UserGroupInformation.createUserForTesting(user, new String[] {});
+ Job job = (Job) ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ SleepJob sleepJob = new SleepJob();
+ sleepJob.setConf(clusterConf);
+ Job myJob = sleepJob.createJob(0, 0, 1000, 1000, 1000, 1000);
+ myJob.submit();
+ return myJob;
+ }
+ });
+ return job;
+ }
+
+ private void verifyViewJobAsAuthorizedUser(final Configuration myConf,
+ final JobID jobId, String authorizedUser) throws IOException,
+ InterruptedException {
+ UserGroupInformation authorizedUGI =
+ UserGroupInformation.createUserForTesting(authorizedUser,
+ new String[] {});
+ authorizedUGI.doAs(new PrivilegedExceptionAction<Object>() {
+ @SuppressWarnings("null")
+ @Override
+ public Object run() throws Exception {
+ Job myJob = null;
+ try {
+ Cluster cluster = new Cluster(myConf);
+ myJob = cluster.getJob(jobId);
+ } catch (Exception e) {
+ fail("Exception .." + e);
+ }
+
+ assertNotNull("Job " + jobId + " is not known to the JobTracker!",
+ myJob);
+
+ // Tests authorization with getCounters
+ try {
+ myJob.getCounters();
+ } catch (IOException ioe) {
+ fail("Unexpected.. exception.. " + ioe);
+ }
+
+ // Tests authorization with getTaskReports
+ try {
+ myJob.getTaskReports(TaskType.JOB_CLEANUP);
+ } catch (IOException ioe) {
+ fail("Unexpected.. exception.. " + ioe);
+ }
+
+ return null;
+ }
+ });
+ }
+
+ private void verifyViewJobAsUnauthorizedUser(final Configuration myConf,
+ final JobID jobId, String unauthorizedUser) throws IOException,
+ InterruptedException {
+ UserGroupInformation unauthorizedUGI =
+ UserGroupInformation.createUserForTesting(unauthorizedUser,
+ new String[] {});
+ unauthorizedUGI.doAs(new PrivilegedExceptionAction<Object>() {
+ @SuppressWarnings("null")
+ @Override
+ public Object run() {
+ Job myJob = null;
+ try {
+ Cluster cluster = new Cluster(myConf);
+ myJob = cluster.getJob(jobId);
+ } catch (Exception e) {
+ fail("Exception .." + e);
+ }
+
+ assertNotNull("Job " + jobId + " is not known to the JobTracker!",
+ myJob);
+
+ // Tests authorization failure with getCounters
+ try {
+ myJob.getCounters();
+ fail("AccessControlException expected..");
+ } catch (IOException ioe) {
+ assertTrue(ioe.getMessage().contains("AccessControlException"));
+ } catch (InterruptedException e) {
+ fail("Exception .. interrupted.." + e);
+ }
+
+ // Tests authorization failure with getTaskReports
+ try {
+ myJob.getTaskReports(TaskType.JOB_SETUP);
+ fail("AccessControlException expected..");
+ } catch (IOException ioe) {
+ assertTrue(ioe.getMessage().contains("AccessControlException"));
+ } catch (InterruptedException e) {
+ fail("Exception .. interrupted.." + e);
+ }
+
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Verify JobContext.Job_ACL_MODIFY_JOB
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws ClassNotFoundException
+ */
+ private void verifyACLModifyJob() throws IOException,
+ InterruptedException, ClassNotFoundException {
+
+ // Set the job up.
+ final Configuration myConf = mr.createJobConf();
+ myConf.set(JobContext.JOB_ACL_MODIFY_JOB, "user1,user3");
+
+ // Submit the job as user1
+ Job job = submitJobAsUser(myConf, "user1");
+
+ final JobID jobId = job.getID();
+
+ // Try operations as an unauthorized user.
+ verifyModifyJobAsUnauthorizedUser(myConf, jobId, "user2");
+
+ // Try operations as an authorized user.
+ verifyModifyJobAsAuthorizedUser(myConf, jobId, "user3");
+ }
+
+ private void verifyModifyJobAsAuthorizedUser(
+ final Configuration clusterConf, final JobID jobId,
+ String authorizedUser) throws IOException, InterruptedException {
+ UserGroupInformation authorizedUGI =
+ UserGroupInformation.createUserForTesting(authorizedUser,
+ new String[] {});
+ authorizedUGI.doAs(new PrivilegedExceptionAction<Object>() {
+ @SuppressWarnings("null")
+ @Override
+ public Object run() throws Exception {
+ Job myJob = null;
+ try {
+ Cluster cluster = new Cluster(clusterConf);
+ myJob = cluster.getJob(jobId);
+ } catch (Exception e) {
+ fail("Exception .." + e);
+ }
+
+ assertNotNull("Job " + jobId + " is not known to the JobTracker!",
+ myJob);
+
+ // Test authorization success with setJobPriority
+ try {
+ myJob.setPriority(JobPriority.HIGH);
+ assertEquals(myJob.getPriority(), JobPriority.HIGH);
+ } catch (IOException ioe) {
+ fail("Unexpected.. exception.. " + ioe);
+ }
+
+ // Test authorization success with killJob
+ try {
+ myJob.killJob();
+ } catch (IOException ioe) {
+ fail("Unexpected.. exception.. " + ioe);
+ }
+
+ return null;
+ }
+ });
+ }
+
+ private void verifyModifyJobAsUnauthorizedUser(
+ final Configuration clusterConf, final JobID jobId,
+ String unauthorizedUser) throws IOException, InterruptedException {
+ UserGroupInformation unauthorizedUGI =
+ UserGroupInformation.createUserForTesting(unauthorizedUser,
+ new String[] {});
+ unauthorizedUGI.doAs(new PrivilegedExceptionAction<Object>() {
+ @SuppressWarnings("null")
+ @Override
+ public Object run() {
+ Job myJob = null;
+ try {
+ Cluster cluster = new Cluster(clusterConf);
+ myJob = cluster.getJob(jobId);
+ } catch (Exception e) {
+ fail("Exception .." + e);
+ }
+
+ assertNotNull("Job " + jobId + " is not known to the JobTracker!",
+ myJob);
+
+ // Tests authorization failure with killJob
+ try {
+ myJob.killJob();
+ fail("AccessControlException expected..");
+ } catch (IOException ioe) {
+ assertTrue(ioe.getMessage().contains("AccessControlException"));
+ } catch (InterruptedException e) {
+ fail("Exception .. interrupted.." + e);
+ }
+
+ // Tests authorization failure with setJobPriority
+ try {
+ myJob.setPriority(JobPriority.HIGH);
+ fail("AccessControlException expected..");
+ } catch (IOException ioe) {
+ assertTrue(ioe.getMessage().contains("AccessControlException"));
+ } catch (InterruptedException e) {
+ fail("Exception .. interrupted.." + e);
+ }
+
+ return null;
+ }
+ });
+ }
+
+ private void verifyACLPersistence() throws IOException,
+ InterruptedException {
+
+ // Set the job up.
+ final Configuration myConf = mr.createJobConf();
+ myConf.set(JobContext.JOB_ACL_VIEW_JOB, "user1,user2");
+
+ // Submit the job as user1
+ Job job = submitJobAsUser(myConf, "user1");
+
+ final JobID jobId = job.getID();
+
+ // Kill the job and wait till it is actually killed so that it is written to
+ // CompletedJobStore
+ job.killJob();
+ while (job.getJobState() != JobStatus.State.KILLED) {
+ LOG.info("Waiting for the job to be killed successfully..");
+ Thread.sleep(200);
+ }
+
+ // Now kill the cluster, so that the job is 'forgotten'
+ tearDown();
+
+ // Re-start the cluster
+ startCluster(true);
+
+ final Configuration myNewJobConf = mr.createJobConf();
+ // Now verify view-job works off CompletedJobStore
+ verifyViewJobAsAuthorizedUser(myNewJobConf, jobId, "user2");
+
+ // Only JobCounters is persisted on the JobStore. So test counters only.
+ UserGroupInformation unauthorizedUGI =
+ UserGroupInformation.createUserForTesting("user3", new String[] {});
+ unauthorizedUGI.doAs(new PrivilegedExceptionAction<Object>() {
+ @SuppressWarnings("null")
+ @Override
+ public Object run() {
+ Job myJob = null;
+ try {
+ Cluster cluster = new Cluster(myNewJobConf);
+ myJob = cluster.getJob(jobId);
+ } catch (Exception e) {
+ fail("Exception .." + e);
+ }
+
+ assertNotNull("Job " + jobId + " is not known to the JobTracker!",
+ myJob);
+
+ // Tests authorization failure with getCounters
+ try {
+ myJob.getCounters();
+ fail("AccessControlException expected..");
+ } catch (IOException ioe) {
+ assertTrue(ioe.getMessage().contains("AccessControlException"));
+ } catch (InterruptedException e) {
+ fail("Exception .. interrupted.." + e);
+ }
+
+ return null;
+ }
+ });
+
+ }
+}