You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:23:46 UTC
svn commit: r1076934 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
contrib/fairscheduler/src/test/org/apache/hadoop/mapred/
mapred/org/apache/hadoop/mapred/ test/org/apache/...
Author: omalley
Date: Fri Mar 4 03:23:45 2011
New Revision: 1076934
URL: http://svn.apache.org/viewvc?rev=1076934&view=rev
Log:
commit 2db8f881c11e434698f1bc9c534dae2c277dae1a
Author: Lee Tucker <lt...@yahoo-inc.com>
Date: Thu Jul 30 17:40:18 2009 -0700
Applying patch 2701949.4490.patch
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar 4 03:23:45 2011
@@ -200,7 +200,7 @@ public class TestCapacityScheduler exten
}
}
TaskAttemptID attemptId = getTaskAttemptID(true, areAllMapsRunning);
- Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
+ Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), getJobConf().getUser()) {
@Override
public String toString() {
return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -242,7 +242,7 @@ public class TestCapacityScheduler exten
}
}
TaskAttemptID attemptId = getTaskAttemptID(false, areAllReducesRunning);
- Task task = new ReduceTask("", attemptId, 0, 10) {
+ Task task = new ReduceTask("", attemptId, 0, 10, getJobConf().getUser()) {
@Override
public String toString() {
return String.format("%s on %s", getTaskID(), tts.getTrackerName());
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Fri Mar 4 03:23:45 2011
@@ -67,7 +67,7 @@ public class TestFairScheduler extends T
public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
int ignored) throws IOException {
TaskAttemptID attemptId = getTaskAttemptID(true);
- Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
+ Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), getJobConf().getUser()) {
@Override
public String toString() {
return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -82,7 +82,7 @@ public class TestFairScheduler extends T
public Task obtainNewReduceTask(final TaskTrackerStatus tts,
int clusterSize, int ignored) throws IOException {
TaskAttemptID attemptId = getTaskAttemptID(false);
- Task task = new ReduceTask("", attemptId, 0, 10) {
+ Task task = new ReduceTask("", attemptId, 0, 10, getJobConf().getUser()) {
@Override
public String toString() {
return String.format("%s on %s", getTaskID(), tts.getTrackerName());
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Fri Mar 4 03:23:45 2011
@@ -206,11 +206,11 @@ public class IsolationRunner {
BytesWritable split = new BytesWritable();
split.readFields(splitFile);
splitFile.close();
- task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split);
+ task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split, conf.getUser());
} else {
int numMaps = conf.getNumMapTasks();
fillInMissingMapOutputs(local, taskId, numMaps, conf);
- task = new ReduceTask(jobFilename.toString(), taskId, partition, numMaps);
+ task = new ReduceTask(jobFilename.toString(), taskId, partition, numMaps, conf.getUser());
}
task.setConf(conf);
task.run(conf, new FakeUmbilical());
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 03:23:45 2011
@@ -70,7 +70,6 @@ class JobInProgress {
JobStatus status;
Path jobFile = null;
Path localJobFile = null;
- Path localJarFile = null;
TaskInProgress maps[] = new TaskInProgress[0];
TaskInProgress reduces[] = new TaskInProgress[0];
@@ -185,6 +184,7 @@ class JobInProgress {
private boolean hasSpeculativeMaps;
private boolean hasSpeculativeReduces;
private long inputLength = 0;
+ private String user;
// Per-job counters
public static enum Counter {
@@ -236,6 +236,12 @@ class JobInProgress {
public JobInProgress(JobID jobid, JobTracker jobtracker,
JobConf default_conf, int rCount) throws IOException {
+ this(jobid, jobtracker, default_conf, null, rCount);
+ }
+
+ JobInProgress(JobID jobid, JobTracker jobtracker,
+ JobConf default_conf, String user, int rCount)
+ throws IOException {
this.restartCount = rCount;
this.jobId = jobid;
String url = "http://" + jobtracker.getJobTrackerMachine() + ":"
@@ -249,8 +255,14 @@ class JobInProgress {
JobConf default_job_conf = new JobConf(default_conf);
this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR
+"/"+jobid + ".xml");
- this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR
- +"/"+ jobid + ".jar");
+
+ if (user == null) {
+ this.user = conf.getUser();
+ } else {
+ this.user = user;
+ }
+ LOG.info("User : " + this.user);
+
Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
FileSystem fs = jobDir.getFileSystem(default_conf);
jobFile = new Path(jobDir, "job.xml");
@@ -258,14 +270,9 @@ class JobInProgress {
conf = new JobConf(localJobFile);
this.priority = conf.getJobPriority();
this.status.setJobPriority(this.priority);
- this.profile = new JobProfile(conf.getUser(), jobid,
+ this.profile = new JobProfile(user, jobid,
jobFile.toString(), url, conf.getJobName(),
conf.getQueueName());
- String jarFile = conf.getJar();
- if (jarFile != null) {
- fs.copyToLocalFile(new Path(jarFile), localJarFile);
- conf.setJar(localJarFile.toString());
- }
this.numMapTasks = conf.getNumMapTasks();
this.numReduceTasks = conf.getNumReduceTasks();
@@ -668,6 +675,14 @@ class JobInProgress {
}
/**
+ * Get the job user/owner
+ * @return the job's user/owner
+ */
+ String getUser() {
+ return user;
+ }
+
+ /**
* Return a vector of completed TaskInProgress objects
*/
public synchronized Vector<TaskInProgress> reportTasksInProgress(boolean shouldBeMap,
@@ -2486,10 +2501,6 @@ class JobInProgress {
localFs.delete(localJobFile, true);
localJobFile = null;
}
- if (localJarFile != null) {
- localFs.delete(localJarFile, true);
- localJarFile = null;
- }
// clean up splits
for (int i = 0; i < maps.length; i++) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 03:23:45 2011
@@ -18,7 +18,14 @@
package org.apache.hadoop.mapred;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
@@ -50,6 +57,7 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.http.HttpServer;
@@ -125,6 +133,8 @@ public class JobTracker implements MRCon
private final List<JobInProgressListener> jobInProgressListeners =
new CopyOnWriteArrayList<JobInProgressListener>();
+ private static final LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator("mapred.local.dir");
// system directories are world-wide readable and owner readable
final static FsPermission SYSTEM_DIR_PERMISSION =
FsPermission.createImmutable((short) 0733); // rwx-wx-wx
@@ -1257,13 +1267,39 @@ public class JobTracker implements MRCon
// I. Init the jobs and cache the recovered job history filenames
Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
Iterator<JobID> idIter = jobsToRecover.iterator();
+ JobInProgress job = null;
+ File jobIdFile = null;
while (idIter.hasNext()) {
JobID id = idIter.next();
LOG.info("Trying to recover details of job " + id);
try {
- // 1. Create the job object
- JobInProgress job =
- new JobInProgress(id, JobTracker.this, conf, restartCount);
+ // 1. Recover job owner and create JIP
+ jobIdFile =
+ new File(lDirAlloc.getLocalPathToRead(SUBDIR + "/" + id, conf).toString());
+
+ String user = null;
+ if (jobIdFile != null && jobIdFile.exists()) {
+ LOG.info("File " + jobIdFile + " exists for job " + id);
+ FileInputStream in = new FileInputStream(jobIdFile);
+ BufferedReader reader = null;
+ try {
+ reader = new BufferedReader(new InputStreamReader(in));
+ user = reader.readLine();
+ LOG.info("Recovered user " + user + " for job " + id);
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ in.close();
+ }
+ }
+ if (user == null) {
+ throw new RuntimeException("Incomplete job " + id);
+ }
+
+ // Create the job
+ job = new JobInProgress(id, JobTracker.this, conf, user,
+ restartCount);
// 2. Check if the user has appropriate access
// Get the user group info for the job's owner
@@ -1309,6 +1345,14 @@ public class JobTracker implements MRCon
} catch (Throwable t) {
LOG.warn("Failed to recover job " + id + " Ignoring the job.", t);
idIter.remove();
+ if (jobIdFile != null) {
+ jobIdFile.delete();
+ jobIdFile = null;
+ }
+ if (job != null) {
+ job.fail();
+ job = null;
+ }
continue;
}
}
@@ -1475,6 +1519,10 @@ public class JobTracker implements MRCon
Map<String, Node> hostnameToNodeMap =
Collections.synchronizedMap(new TreeMap<String, Node>());
+ // job-id->username during staging
+ Map<JobID, String> jobToUserMap =
+ Collections.synchronizedMap(new TreeMap<JobID, String>());
+
// Number of resolved entries
int numResolved;
@@ -1732,7 +1780,9 @@ public class JobTracker implements MRCon
}
// Same with 'localDir' except it's always on the local disk.
- jobConf.deleteLocalFiles(SUBDIR);
+ if (!hasRestarted) {
+ jobConf.deleteLocalFiles(SUBDIR);
+ }
// Initialize history again if it is not initialized
// because history was on dfs and namenode was in safemode.
@@ -2117,6 +2167,17 @@ public class JobTracker implements MRCon
// mark the job for cleanup at all the trackers
addJobForCleanup(id);
+ try {
+ File userFileForJob =
+ new File(lDirAlloc.getLocalPathToRead(SUBDIR + "/" + id,
+ conf).toString());
+ if (userFileForJob != null) {
+ userFileForJob.delete();
+ }
+ } catch (IOException ioe) {
+ LOG.info("Failed to delete job id mapping for job " + id, ioe);
+ }
+
// add the blacklisted trackers to potentially faulty list
if (job.getStatus().getRunState() == JobStatus.SUCCEEDED) {
if (job.getNoOfBlackListedTrackers() > 0) {
@@ -2989,7 +3050,17 @@ public class JobTracker implements MRCon
* Allocates a new JobId string.
*/
public synchronized JobID getNewJobId() throws IOException {
- return new JobID(getTrackerIdentifier(), nextJobId++);
+ JobID id = new JobID(getTrackerIdentifier(), nextJobId++);
+
+ // get the user group info
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
+
+ // mark the user for this id
+ jobToUserMap.put(id, ugi.getUserName());
+
+ LOG.info("Job id " + id + " assigned to user " + ugi.getUserName());
+
+ return id;
}
/**
@@ -3005,12 +3076,59 @@ public class JobTracker implements MRCon
//job already running, don't start twice
return jobs.get(jobId).getStatus();
}
+
+ // check if the owner is uploding the splits or not
+ // get the user group info
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
+
+ // check if the user invoking this api is the owner of this job
+ if (!jobToUserMap.get(jobId).equals(ugi.getUserName())) {
+ throw new IOException("User " + ugi.getUserName()
+ + " is not the owner of the job " + jobId);
+ }
- JobInProgress job = new JobInProgress(jobId, this, this.conf);
+ jobToUserMap.remove(jobId);
+
+ // persist
+ File userFileForJob =
+ new File(lDirAlloc.getLocalPathForWrite(SUBDIR + "/" + jobId,
+ conf).toString());
+ if (userFileForJob == null) {
+ LOG.info("Failed to create job-id file for job " + jobId + " at " + userFileForJob);
+ } else {
+ FileOutputStream fout = new FileOutputStream(userFileForJob);
+ BufferedWriter writer = null;
+
+ try {
+ writer = new BufferedWriter(new OutputStreamWriter(fout));
+ writer.write(ugi.getUserName() + "\n");
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ fout.close();
+ }
+
+ LOG.info("Job " + jobId + " user info persisted to file : " + userFileForJob);
+ }
+
+ JobInProgress job = null;
+ try {
+ job = new JobInProgress(jobId, this, this.conf, ugi.getUserName(), 0);
+ } catch (Exception e) {
+ if (userFileForJob != null) {
+ userFileForJob.delete();
+ }
+ throw new IOException(e);
+ }
String queue = job.getProfile().getQueueName();
if(!(queueManager.getQueues().contains(queue))) {
new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
+ job.fail();
+ if (userFileForJob != null) {
+ userFileForJob.delete();
+ }
throw new IOException("Queue \"" + queue + "\" does not exist");
}
@@ -3020,6 +3138,10 @@ public class JobTracker implements MRCon
} catch (IOException ioe) {
LOG.warn("Access denied for user " + job.getJobConf().getUser()
+ ". Ignoring job " + jobId, ioe);
+ job.fail();
+ if (userFileForJob != null) {
+ userFileForJob.delete();
+ }
new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
throw ioe;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Mar 4 03:23:45 2011
@@ -133,7 +133,7 @@ class LinuxTaskController extends TaskCo
List<String> launchTaskJVMArgs = buildTaskCommandArgs(context);
ShellCommandExecutor shExec = buildTaskControllerExecutor(
TaskCommands.LAUNCH_TASK_JVM,
- env.conf.getUser(),
+ context.task.getUser(),
launchTaskJVMArgs, env);
context.shExec = shExec;
shExec.execute();
@@ -207,7 +207,7 @@ class LinuxTaskController extends TaskCo
try {
ShellCommandExecutor shExec = buildTaskControllerExecutor(
TaskCommands.KILL_TASK_JVM,
- context.env.conf.getUser(),
+ context.task.getUser(),
killTaskJVMArgs,
context.env);
shExec.execute();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar 4 03:23:45 2011
@@ -166,7 +166,7 @@ class LocalJobRunner implements JobSubmi
MapTask map = new MapTask(file.toString(),
mapId, i,
rawSplits[i].getClassName(),
- rawSplits[i].getBytes());
+ rawSplits[i].getBytes(), job.getUser());
JobConf localConf = new JobConf(job);
map.setJobFile(localFile.toString());
map.localizeConfiguration(localConf);
@@ -205,7 +205,7 @@ class LocalJobRunner implements JobSubmi
}
if (!this.isInterrupted()) {
ReduceTask reduce = new ReduceTask(file.toString(),
- reduceId, 0, mapIds.size());
+ reduceId, 0, mapIds.size(), job.getUser());
JobConf localConf = new JobConf(job);
reduce.setJobFile(localFile.toString());
reduce.localizeConfiguration(localConf);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri Mar 4 03:23:45 2011
@@ -89,9 +89,9 @@ class MapTask extends Task {
}
public MapTask(String jobFile, TaskAttemptID taskId,
- int partition, String splitClass, BytesWritable split
- ) {
- super(jobFile, taskId, partition);
+ int partition, String splitClass, BytesWritable split,
+ String username) {
+ super(jobFile, taskId, partition, username);
this.splitClass = splitClass;
this.split = split;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Mar 4 03:23:45 2011
@@ -150,8 +150,8 @@ class ReduceTask extends Task {
}
public ReduceTask(String jobFile, TaskAttemptID taskId,
- int partition, int numMaps) {
- super(jobFile, taskId, partition);
+ int partition, int numMaps, String username) {
+ super(jobFile, taskId, partition, username);
this.numMaps = numMaps;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar 4 03:23:45 2011
@@ -136,6 +136,7 @@ abstract class Task implements Writable,
protected TaskAttemptContext taskContext;
protected org.apache.hadoop.mapreduce.OutputFormat<?,?> outputFormat;
protected org.apache.hadoop.mapreduce.OutputCommitter committer;
+ protected String username;
protected final Counters.Counter spilledRecordsCounter;
private String pidFile = "";
protected TaskUmbilicalProtocol umbilical;
@@ -150,7 +151,8 @@ abstract class Task implements Writable,
spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
}
- public Task(String jobFile, TaskAttemptID taskId, int partition) {
+ public Task(String jobFile, TaskAttemptID taskId, int partition, String username) {
+ this.username = username;
this.jobFile = jobFile;
this.taskId = taskId;
@@ -318,6 +320,9 @@ abstract class Task implements Writable,
return !jobSetup && !jobCleanup && !taskCleanup;
}
+ String getUser() {
+ return username;
+ }
////////////////////////////////////////////
// Writable methods
////////////////////////////////////////////
@@ -331,6 +336,7 @@ abstract class Task implements Writable,
out.writeBoolean(skipping);
out.writeBoolean(jobCleanup);
out.writeBoolean(jobSetup);
+ Text.writeString(out, username);
out.writeBoolean(writeSkipRecs);
out.writeBoolean(taskCleanup);
Text.writeString(out, pidFile);
@@ -348,6 +354,7 @@ abstract class Task implements Writable,
skipping = in.readBoolean();
jobCleanup = in.readBoolean();
jobSetup = in.readBoolean();
+ username = Text.readString(in);
writeSkipRecs = in.readBoolean();
taskCleanup = in.readBoolean();
if (taskCleanup) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Mar 4 03:23:45 2011
@@ -928,9 +928,10 @@ class TaskInProgress {
} else {
split = new BytesWritable();
}
- t = new MapTask(jobFile, taskid, partition, splitClass, split);
+ t = new MapTask(jobFile, taskid, partition, splitClass, split,
+ job.getUser());
} else {
- t = new ReduceTask(jobFile, taskid, partition, numMaps);
+ t = new ReduceTask(jobFile, taskid, partition, numMaps, job.getUser());
}
if (jobCleanup) {
t.setJobCleanupTask();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Fri Mar 4 03:23:45 2011
@@ -75,7 +75,7 @@ public class TestJobQueueTaskScheduler e
public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
int ignored) throws IOException {
TaskAttemptID attemptId = getTaskAttemptID(true);
- Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
+ Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), getJobConf().getUser()) {
@Override
public String toString() {
return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -90,7 +90,7 @@ public class TestJobQueueTaskScheduler e
public Task obtainNewReduceTask(final TaskTrackerStatus tts,
int clusterSize, int ignored) throws IOException {
TaskAttemptID attemptId = getTaskAttemptID(false);
- Task task = new ReduceTask("", attemptId, 0, 10) {
+ Task task = new ReduceTask("", attemptId, 0, 10, getJobConf().getUser()) {
@Override
public String toString() {
return String.format("%s on %s", getTaskID(), tts.getTrackerName());
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Fri Mar 4 03:23:45 2011
@@ -337,6 +337,9 @@ public class TestRecoveryManager extends
RunningJob rJob = jc.submitJob(job);
LOG.info("Submitted first job " + rJob.getID());
+ // wait for 1 min
+ UtilsForTests.waitFor(60000);
+
// kill the jobtracker multiple times and check if the count is correct
for (int i = 1; i <= 5; ++i) {
LOG.info("Stopping jobtracker for " + i + " time");
@@ -369,9 +372,6 @@ public class TestRecoveryManager extends
UtilsForTests.configureWaitingJobConf(job1,
new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 50, 0,
"test-recovery-manager", signalFile, signalFile);
-
- // make sure that the job id's dont clash
- jobtracker.getNewJobId();
// submit a new job
rJob = jc.submitJob(job1);