You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:23:46 UTC

svn commit: r1076934 - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapred/ test/org/apache/...

Author: omalley
Date: Fri Mar  4 03:23:45 2011
New Revision: 1076934

URL: http://svn.apache.org/viewvc?rev=1076934&view=rev
Log:
commit 2db8f881c11e434698f1bc9c534dae2c277dae1a
Author: Lee Tucker <lt...@yahoo-inc.com>
Date:   Thu Jul 30 17:40:18 2009 -0700

    Applying patch 2701949.4490.patch

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar  4 03:23:45 2011
@@ -200,7 +200,7 @@ public class TestCapacityScheduler exten
         }
       }
       TaskAttemptID attemptId = getTaskAttemptID(true, areAllMapsRunning);
-      Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
+      Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), getJobConf().getUser()) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -242,7 +242,7 @@ public class TestCapacityScheduler exten
         }
       }
       TaskAttemptID attemptId = getTaskAttemptID(false, areAllReducesRunning);
-      Task task = new ReduceTask("", attemptId, 0, 10) {
+      Task task = new ReduceTask("", attemptId, 0, 10, getJobConf().getUser()) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Fri Mar  4 03:23:45 2011
@@ -67,7 +67,7 @@ public class TestFairScheduler extends T
     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
         int ignored) throws IOException {
       TaskAttemptID attemptId = getTaskAttemptID(true);
-      Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
+      Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), getJobConf().getUser()) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -82,7 +82,7 @@ public class TestFairScheduler extends T
     public Task obtainNewReduceTask(final TaskTrackerStatus tts,
         int clusterSize, int ignored) throws IOException {
       TaskAttemptID attemptId = getTaskAttemptID(false);
-      Task task = new ReduceTask("", attemptId, 0, 10) {
+      Task task = new ReduceTask("", attemptId, 0, 10, getJobConf().getUser()) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Fri Mar  4 03:23:45 2011
@@ -206,11 +206,11 @@ public class IsolationRunner {
       BytesWritable split = new BytesWritable();
       split.readFields(splitFile);
       splitFile.close();
-      task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split);
+      task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split, conf.getUser());
     } else {
       int numMaps = conf.getNumMapTasks();
       fillInMissingMapOutputs(local, taskId, numMaps, conf);
-      task = new ReduceTask(jobFilename.toString(), taskId, partition, numMaps);
+      task = new ReduceTask(jobFilename.toString(), taskId, partition, numMaps, conf.getUser());
     }
     task.setConf(conf);
     task.run(conf, new FakeUmbilical());

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 03:23:45 2011
@@ -70,7 +70,6 @@ class JobInProgress {
   JobStatus status;
   Path jobFile = null;
   Path localJobFile = null;
-  Path localJarFile = null;
 
   TaskInProgress maps[] = new TaskInProgress[0];
   TaskInProgress reduces[] = new TaskInProgress[0];
@@ -185,6 +184,7 @@ class JobInProgress {
   private boolean hasSpeculativeMaps;
   private boolean hasSpeculativeReduces;
   private long inputLength = 0;
+  private String user;
   
   // Per-job counters
   public static enum Counter { 
@@ -236,6 +236,12 @@ class JobInProgress {
   
   public JobInProgress(JobID jobid, JobTracker jobtracker, 
                        JobConf default_conf, int rCount) throws IOException {
+    this(jobid, jobtracker, default_conf, null, rCount);
+  }
+
+  JobInProgress(JobID jobid, JobTracker jobtracker,
+                JobConf default_conf, String user, int rCount) 
+  throws IOException {
     this.restartCount = rCount;
     this.jobId = jobid;
     String url = "http://" + jobtracker.getJobTrackerMachine() + ":" 
@@ -249,8 +255,14 @@ class JobInProgress {
     JobConf default_job_conf = new JobConf(default_conf);
     this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR 
                                                       +"/"+jobid + ".xml");
-    this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR
-                                                      +"/"+ jobid + ".jar");
+
+    if (user == null) {
+      this.user = conf.getUser();
+    } else {
+      this.user = user;
+    }
+    LOG.info("User : " +  this.user);
+
     Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
     FileSystem fs = jobDir.getFileSystem(default_conf);
     jobFile = new Path(jobDir, "job.xml");
@@ -258,14 +270,9 @@ class JobInProgress {
     conf = new JobConf(localJobFile);
     this.priority = conf.getJobPriority();
     this.status.setJobPriority(this.priority);
-    this.profile = new JobProfile(conf.getUser(), jobid, 
+    this.profile = new JobProfile(user, jobid, 
                                   jobFile.toString(), url, conf.getJobName(),
                                   conf.getQueueName());
-    String jarFile = conf.getJar();
-    if (jarFile != null) {
-      fs.copyToLocalFile(new Path(jarFile), localJarFile);
-      conf.setJar(localJarFile.toString());
-    }
 
     this.numMapTasks = conf.getNumMapTasks();
     this.numReduceTasks = conf.getNumReduceTasks();
@@ -668,6 +675,14 @@ class JobInProgress {
   }
     
   /**
+   * Get the job user/owner
+   * @return the job's user/owner
+   */ 
+  String getUser() {
+    return user;
+  }
+
+  /**
    * Return a vector of completed TaskInProgress objects
    */
   public synchronized Vector<TaskInProgress> reportTasksInProgress(boolean shouldBeMap,
@@ -2486,10 +2501,6 @@ class JobInProgress {
         localFs.delete(localJobFile, true);
         localJobFile = null;
       }
-      if (localJarFile != null) {
-        localFs.delete(localJarFile, true);
-        localJarFile = null;
-      }
 
       // clean up splits
       for (int i = 0; i < maps.length; i++) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 03:23:45 2011
@@ -18,7 +18,14 @@
 package org.apache.hadoop.mapred;
 
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
@@ -50,6 +57,7 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
@@ -125,6 +133,8 @@ public class JobTracker implements MRCon
   private final List<JobInProgressListener> jobInProgressListeners =
     new CopyOnWriteArrayList<JobInProgressListener>();
 
+  private static final LocalDirAllocator lDirAlloc = 
+                              new LocalDirAllocator("mapred.local.dir");
   // system directories are world-wide readable and owner readable
   final static FsPermission SYSTEM_DIR_PERMISSION =
     FsPermission.createImmutable((short) 0733); // rwx-wx-wx
@@ -1257,13 +1267,39 @@ public class JobTracker implements MRCon
       // I. Init the jobs and cache the recovered job history filenames
       Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
       Iterator<JobID> idIter = jobsToRecover.iterator();
+      JobInProgress job = null;
+      File jobIdFile = null;
       while (idIter.hasNext()) {
         JobID id = idIter.next();
         LOG.info("Trying to recover details of job " + id);
         try {
-          // 1. Create the job object
-          JobInProgress job = 
-            new JobInProgress(id, JobTracker.this, conf, restartCount);
+          // 1. Recover job owner and create JIP
+          jobIdFile = 
+            new File(lDirAlloc.getLocalPathToRead(SUBDIR + "/" + id, conf).toString());
+
+          String user = null;
+          if (jobIdFile != null && jobIdFile.exists()) {
+            LOG.info("File " + jobIdFile + " exists for job " + id);
+            FileInputStream in = new FileInputStream(jobIdFile);
+            BufferedReader reader = null;
+            try {
+              reader = new BufferedReader(new InputStreamReader(in));
+              user = reader.readLine();
+              LOG.info("Recovered user " + user + " for job " + id);
+            } finally {
+              if (reader != null) {
+                reader.close();
+              }
+              in.close();
+            }
+          }
+          if (user == null) {
+            throw new RuntimeException("Incomplete job " + id);
+          }
+
+          // Create the job
+          job = new JobInProgress(id, JobTracker.this, conf, user, 
+                                  restartCount);
 
           // 2. Check if the user has appropriate access
           // Get the user group info for the job's owner
@@ -1309,6 +1345,14 @@ public class JobTracker implements MRCon
         } catch (Throwable t) {
           LOG.warn("Failed to recover job " + id + " Ignoring the job.", t);
           idIter.remove();
+          if (jobIdFile != null) {
+            jobIdFile.delete();
+            jobIdFile = null;
+          }
+          if (job != null) {
+            job.fail();
+            job = null;
+          }
           continue;
         }
       }
@@ -1475,6 +1519,10 @@ public class JobTracker implements MRCon
   Map<String, Node> hostnameToNodeMap = 
     Collections.synchronizedMap(new TreeMap<String, Node>());
   
+  // job-id->username during staging
+  Map<JobID, String> jobToUserMap = 
+    Collections.synchronizedMap(new TreeMap<JobID, String>()); 
+
   // Number of resolved entries
   int numResolved;
     
@@ -1732,7 +1780,9 @@ public class JobTracker implements MRCon
     }
     
     // Same with 'localDir' except it's always on the local disk.
-    jobConf.deleteLocalFiles(SUBDIR);
+    if (!hasRestarted) {
+      jobConf.deleteLocalFiles(SUBDIR);
+    }
 
     // Initialize history again if it is not initialized
     // because history was on dfs and namenode was in safemode.
@@ -2117,6 +2167,17 @@ public class JobTracker implements MRCon
     // mark the job for cleanup at all the trackers
     addJobForCleanup(id);
 
+    try {
+      File userFileForJob =
+        new File(lDirAlloc.getLocalPathToRead(SUBDIR + "/" + id,
+                                              conf).toString());
+      if (userFileForJob != null) {
+        userFileForJob.delete();
+      }
+    } catch (IOException ioe) {
+      LOG.info("Failed to delete job id mapping for job " + id, ioe);
+    }
+
     // add the blacklisted trackers to potentially faulty list
     if (job.getStatus().getRunState() == JobStatus.SUCCEEDED) {
       if (job.getNoOfBlackListedTrackers() > 0) {
@@ -2989,7 +3050,17 @@ public class JobTracker implements MRCon
    * Allocates a new JobId string.
    */
   public synchronized JobID getNewJobId() throws IOException {
-    return new JobID(getTrackerIdentifier(), nextJobId++);
+    JobID id = new JobID(getTrackerIdentifier(), nextJobId++);
+
+    // get the user group info
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
+
+    // mark the user for this id
+    jobToUserMap.put(id, ugi.getUserName());
+
+    LOG.info("Job id " + id + " assigned to user " + ugi.getUserName());
+
+    return id;
   }
 
   /**
@@ -3005,12 +3076,59 @@ public class JobTracker implements MRCon
       //job already running, don't start twice
       return jobs.get(jobId).getStatus();
     }
+
+    // check if the owner is uploding the splits or not
+    // get the user group info
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
+
+    // check if the user invoking this api is the owner of this job
+    if (!jobToUserMap.get(jobId).equals(ugi.getUserName())) {
+      throw new IOException("User " + ugi.getUserName() 
+                            + " is not the owner of the job " + jobId);
+    }
     
-    JobInProgress job = new JobInProgress(jobId, this, this.conf);
+    jobToUserMap.remove(jobId);
+
+    // persist
+    File userFileForJob =  
+      new File(lDirAlloc.getLocalPathForWrite(SUBDIR + "/" + jobId, 
+                                              conf).toString());
+    if (userFileForJob == null) {
+      LOG.info("Failed to create job-id file for job " + jobId + " at " + userFileForJob);
+    } else {
+      FileOutputStream fout = new FileOutputStream(userFileForJob);
+      BufferedWriter writer = null;
+
+      try {
+        writer = new BufferedWriter(new OutputStreamWriter(fout));
+        writer.write(ugi.getUserName() + "\n");
+      } finally {
+        if (writer != null) {
+          writer.close();
+        }
+        fout.close();
+      }
+
+      LOG.info("Job " + jobId + " user info persisted to file : " + userFileForJob);
+    }
+
+    JobInProgress job = null;
+    try {
+      job = new JobInProgress(jobId, this, this.conf, ugi.getUserName(), 0);
+    } catch (Exception e) {
+      if (userFileForJob != null) {
+        userFileForJob.delete();
+      }
+      throw new IOException(e);
+    }
     
     String queue = job.getProfile().getQueueName();
     if(!(queueManager.getQueues().contains(queue))) {      
       new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
+      job.fail();
+      if (userFileForJob != null) {
+        userFileForJob.delete();
+      }
       throw new IOException("Queue \"" + queue + "\" does not exist");        
     }
 
@@ -3020,6 +3138,10 @@ public class JobTracker implements MRCon
     } catch (IOException ioe) {
        LOG.warn("Access denied for user " + job.getJobConf().getUser() 
                 + ". Ignoring job " + jobId, ioe);
+      job.fail();
+      if (userFileForJob != null) {
+        userFileForJob.delete();
+      }
       new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
       throw ioe;
     }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Mar  4 03:23:45 2011
@@ -133,7 +133,7 @@ class LinuxTaskController extends TaskCo
     List<String> launchTaskJVMArgs = buildTaskCommandArgs(context);
     ShellCommandExecutor shExec =  buildTaskControllerExecutor(
                                     TaskCommands.LAUNCH_TASK_JVM, 
-                                    env.conf.getUser(),
+                                    context.task.getUser(),
                                     launchTaskJVMArgs, env);
     context.shExec = shExec;
     shExec.execute();
@@ -207,7 +207,7 @@ class LinuxTaskController extends TaskCo
     try {
       ShellCommandExecutor shExec = buildTaskControllerExecutor(
                                       TaskCommands.KILL_TASK_JVM,
-                                      context.env.conf.getUser(),
+                                      context.task.getUser(),
                                       killTaskJVMArgs, 
                                       context.env);
       shExec.execute();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar  4 03:23:45 2011
@@ -166,7 +166,7 @@ class LocalJobRunner implements JobSubmi
             MapTask map = new MapTask(file.toString(),  
                                       mapId, i,
                                       rawSplits[i].getClassName(),
-                                      rawSplits[i].getBytes());
+                                      rawSplits[i].getBytes(), job.getUser());
             JobConf localConf = new JobConf(job);
             map.setJobFile(localFile.toString());
             map.localizeConfiguration(localConf);
@@ -205,7 +205,7 @@ class LocalJobRunner implements JobSubmi
             }
             if (!this.isInterrupted()) {
               ReduceTask reduce = new ReduceTask(file.toString(), 
-                                                 reduceId, 0, mapIds.size());
+                                                 reduceId, 0, mapIds.size(), job.getUser());
               JobConf localConf = new JobConf(job);
               reduce.setJobFile(localFile.toString());
               reduce.localizeConfiguration(localConf);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri Mar  4 03:23:45 2011
@@ -89,9 +89,9 @@ class MapTask extends Task {
   }
 
   public MapTask(String jobFile, TaskAttemptID taskId, 
-                 int partition, String splitClass, BytesWritable split
-                 ) {
-    super(jobFile, taskId, partition);
+                 int partition, String splitClass, BytesWritable split,
+                 String username) {
+    super(jobFile, taskId, partition, username);
     this.splitClass = splitClass;
     this.split = split;
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Mar  4 03:23:45 2011
@@ -150,8 +150,8 @@ class ReduceTask extends Task {
   }
 
   public ReduceTask(String jobFile, TaskAttemptID taskId,
-                    int partition, int numMaps) {
-    super(jobFile, taskId, partition);
+                    int partition, int numMaps, String username) {
+    super(jobFile, taskId, partition, username);
     this.numMaps = numMaps;
   }
   

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar  4 03:23:45 2011
@@ -136,6 +136,7 @@ abstract class Task implements Writable,
   protected TaskAttemptContext taskContext;
   protected org.apache.hadoop.mapreduce.OutputFormat<?,?> outputFormat;
   protected org.apache.hadoop.mapreduce.OutputCommitter committer;
+  protected String username;
   protected final Counters.Counter spilledRecordsCounter;
   private String pidFile = "";
   protected TaskUmbilicalProtocol umbilical;
@@ -150,7 +151,8 @@ abstract class Task implements Writable,
     spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
   }
 
-  public Task(String jobFile, TaskAttemptID taskId, int partition) {
+  public Task(String jobFile, TaskAttemptID taskId, int partition, String username) {
+    this.username = username;
     this.jobFile = jobFile;
     this.taskId = taskId;
      
@@ -318,6 +320,9 @@ abstract class Task implements Writable,
     return !jobSetup && !jobCleanup && !taskCleanup;
   }
   
+  String getUser() {
+    return username;
+  }
   ////////////////////////////////////////////
   // Writable methods
   ////////////////////////////////////////////
@@ -331,6 +336,7 @@ abstract class Task implements Writable,
     out.writeBoolean(skipping);
     out.writeBoolean(jobCleanup);
     out.writeBoolean(jobSetup);
+    Text.writeString(out, username);
     out.writeBoolean(writeSkipRecs);
     out.writeBoolean(taskCleanup);  
     Text.writeString(out, pidFile);
@@ -348,6 +354,7 @@ abstract class Task implements Writable,
     skipping = in.readBoolean();
     jobCleanup = in.readBoolean();
     jobSetup = in.readBoolean();
+    username = Text.readString(in);
     writeSkipRecs = in.readBoolean();
     taskCleanup = in.readBoolean();
     if (taskCleanup) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Mar  4 03:23:45 2011
@@ -928,9 +928,10 @@ class TaskInProgress {
       } else {
         split = new BytesWritable();
       }
-      t = new MapTask(jobFile, taskid, partition, splitClass, split);
+      t = new MapTask(jobFile, taskid, partition, splitClass, split, 
+                      job.getUser());
     } else {
-      t = new ReduceTask(jobFile, taskid, partition, numMaps);
+      t = new ReduceTask(jobFile, taskid, partition, numMaps, job.getUser());
     }
     if (jobCleanup) {
       t.setJobCleanupTask();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Fri Mar  4 03:23:45 2011
@@ -75,7 +75,7 @@ public class TestJobQueueTaskScheduler e
     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
         int ignored) throws IOException {
       TaskAttemptID attemptId = getTaskAttemptID(true);
-      Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
+      Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), getJobConf().getUser()) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -90,7 +90,7 @@ public class TestJobQueueTaskScheduler e
     public Task obtainNewReduceTask(final TaskTrackerStatus tts,
         int clusterSize, int ignored) throws IOException {
       TaskAttemptID attemptId = getTaskAttemptID(false);
-      Task task = new ReduceTask("", attemptId, 0, 10) {
+      Task task = new ReduceTask("", attemptId, 0, 10, getJobConf().getUser()) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1076934&r1=1076933&r2=1076934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Fri Mar  4 03:23:45 2011
@@ -337,6 +337,9 @@ public class TestRecoveryManager extends
     RunningJob rJob = jc.submitJob(job);
     LOG.info("Submitted first job " + rJob.getID());
 
+    // wait for 1 min
+    UtilsForTests.waitFor(60000);
+
     // kill the jobtracker multiple times and check if the count is correct
     for (int i = 1; i <= 5; ++i) {
       LOG.info("Stopping jobtracker for " + i + " time");
@@ -369,9 +372,6 @@ public class TestRecoveryManager extends
     UtilsForTests.configureWaitingJobConf(job1, 
         new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 50, 0, 
         "test-recovery-manager", signalFile, signalFile);
-    
-    // make sure that the job id's dont clash
-    jobtracker.getNewJobId();
 
     // submit a new job
     rJob = jc.submitJob(job1);