You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:43:35 UTC

svn commit: r1077679 [5/6] - in /hadoop/common/branches/branch-0.20-security-patches: ./ src/c++/task-controller/ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/c++/task-controller/tests/ src/core/org/apache/hadoop/fs/ src/core/org/apa...

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 04:43:33 2011
@@ -55,6 +55,7 @@ import javax.servlet.http.HttpServletRes
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.server.tasktracker.*;
 import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
@@ -66,17 +67,15 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapred.QueueManager.QueueACL;
-import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerTaskPathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerJobPathDeletionContext;
+import org.apache.hadoop.mapred.TaskController.DeletionContext;
 import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
 import org.apache.hadoop.mapred.TaskLog.LogName;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
@@ -157,14 +156,15 @@ public class TaskTracker implements MRCo
   public static final Log ClientTraceLog =
     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
 
-  // Job ACLs file is created by TaskTracker under userlogs/$jobid directory for
-  // each job at job localization time. This will be used by TaskLogServlet for
-  // authorizing viewing of task logs of that job
+  //Job ACLs file is created by TaskController under userlogs/$jobid directory
+  //for each job at job localization time. This will be used by TaskLogServlet 
+  //for authorizing viewing of task logs of that job
   static String jobACLsFile = "job-acls.xml";
 
   volatile boolean running = true;
 
   private LocalDirAllocator localDirAllocator;
+  private String[] localdirs;
   String taskTrackerName;
   String localHostname;
   InetSocketAddress jobTrackAddr;
@@ -242,9 +242,11 @@ public class TaskTracker implements MRCo
   static final String DISTCACHEDIR = "distcache";
   static final String JOBCACHE = "jobcache";
   static final String OUTPUT = "output";
-  private static final String JARSDIR = "jars";
+  static final String JARSDIR = "jars";
   static final String LOCAL_SPLIT_FILE = "split.info";
   static final String JOBFILE = "job.xml";
+  static final String TT_PRIVATE_DIR = "ttprivate";
+  static final String JVM_EXTRA_ENV_FILE = "jvm.extra.env";
 
   static final String JOB_LOCAL_DIR = "job.local.dir";
   static final String JOB_TOKEN_FILE="jobToken"; //localized file
@@ -440,18 +442,28 @@ public class TaskTracker implements MRCo
   static String getLocalJobConfFile(String user, String jobid) {
     return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
   }
+  
+  static String getPrivateDirJobConfFile(String user, String jobid) {
+    return TT_PRIVATE_DIR + Path.SEPARATOR + getLocalJobConfFile(user, jobid);
+  }
 
   static String getTaskConfFile(String user, String jobid, String taskid,
       boolean isCleanupAttempt) {
     return getLocalTaskDir(user, jobid, taskid, isCleanupAttempt)
     + Path.SEPARATOR + TaskTracker.JOBFILE;
   }
+  
+  static String getPrivateDirTaskScriptLocation(String user, String jobid, 
+     String taskid) {
+    return TT_PRIVATE_DIR + Path.SEPARATOR + 
+           getLocalTaskDir(user, jobid, taskid);
+  }
 
   static String getJobJarsDir(String user, String jobid) {
     return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
   }
 
-  static String getJobJarFile(String user, String jobid) {
+  public static String getJobJarFile(String user, String jobid) {
     return getJobJarsDir(user, jobid) + Path.SEPARATOR + "job.jar";
   }
   
@@ -470,7 +482,8 @@ public class TaskTracker implements MRCo
     + TaskTracker.OUTPUT;
   }
 
-  static String getLocalTaskDir(String user, String jobid, String taskid) {
+  public static String getLocalTaskDir(String user, String jobid, 
+      String taskid) {
     return getLocalTaskDir(user, jobid, taskid, false);
   }
   
@@ -492,6 +505,15 @@ public class TaskTracker implements MRCo
   static String getLocalJobTokenFile(String user, String jobid) {
     return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
   }
+  
+  static String getPrivateDirJobTokenFile(String user, String jobid) {
+    return TT_PRIVATE_DIR + Path.SEPARATOR + 
+           getLocalJobTokenFile(user, jobid); 
+  }
+  
+  static String getPrivateDirForJob(String user, String jobid) {
+    return TT_PRIVATE_DIR + Path.SEPARATOR + getLocalJobDir(user, jobid) ;
+  }
 
   private FileSystem getFS(final Path filePath, JobID jobId,
       final Configuration conf) throws IOException, InterruptedException {
@@ -522,6 +544,23 @@ public class TaskTracker implements MRCo
     }
   }
 
+  /**
+   * Delete all of the user directories.
+   * @param conf the TT configuration
+   * @throws IOException
+   */
+  private void deleteUserDirectories(Configuration conf) throws IOException {
+    for(String root: localdirs) {
+      for(FileStatus status: localFs.listStatus(new Path(root, SUBDIR))) {
+        String owner = status.getOwner();
+        String path = status.getPath().getName();
+        if (path.endsWith("/" + owner)) {
+          taskController.deleteAsUser(owner, "");
+        }
+      }
+    }
+  }
+
   public static final String TT_USER_NAME = "mapreduce.tasktracker.kerberos.principal";
   public static final String TT_KEYTAB_FILE =
     "mapreduce.tasktracker.keytab.file";  
@@ -548,8 +587,18 @@ public class TaskTracker implements MRCo
     }
  
     //check local disk
-    checkLocalDirs(this.fConf.getLocalDirs());
+    checkLocalDirs((localdirs = this.fConf.getLocalDirs()));
+    deleteUserDirectories(fConf);
     fConf.deleteLocalFiles(SUBDIR);
+    final FsPermission ttdir = FsPermission.createImmutable((short) 0755);
+    for (String s : localdirs) {
+      localFs.mkdirs(new Path(s, SUBDIR), ttdir);
+    }
+    fConf.deleteLocalFiles(TT_PRIVATE_DIR);
+    final FsPermission priv = FsPermission.createImmutable((short) 0700);
+    for (String s : localdirs) {
+      localFs.mkdirs(new Path(s, TT_PRIVATE_DIR), priv);
+    }
 
     // Clear out state tables
     this.tasks.clear();
@@ -608,14 +657,6 @@ public class TaskTracker implements MRCo
     this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
     LOG.info("Starting tracker " + taskTrackerName);
 
-    Class<? extends TaskController> taskControllerClass = fConf.getClass(
-        "mapred.task.tracker.task-controller", DefaultTaskController.class, TaskController.class);
-    taskController = (TaskController) ReflectionUtils.newInstance(
-        taskControllerClass, fConf);
-
-    // setup and create jobcache directory with appropriate permissions
-    taskController.setup();
-
     // Initialize DistributedCache
     this.distributedCacheManager = new TrackerDistributedCacheManager(
         this.fConf, taskController);
@@ -651,7 +692,7 @@ public class TaskTracker implements MRCo
     reduceLauncher.start();
 
     // create a localizer instance
-    setLocalizer(new Localizer(localFs, fConf.getLocalDirs(), taskController));
+    setLocalizer(new Localizer(localFs, fConf.getLocalDirs()));
 
     //Start up node health checker service.
     if (shouldStartHealthMonitor(this.fConf)) {
@@ -897,25 +938,18 @@ public class TaskTracker implements MRCo
     JobID jobId = t.getJobID();
     RunningJob rjob = addTaskToJob(jobId, tip);
 
-    // Initialize the user directories if needed.
-    getLocalizer().initializeUserDirs(t.getUser());
-
     synchronized (rjob) {
       if (!rjob.localized) {
-        JobConf localJobConf = localizeJobFiles(t, rjob);
-        // initialize job log directory
-        initializeJobLogDir(jobId, localJobConf);
-
-        // Now initialize the job via task-controller so as to set
-        // ownership/permissions of jars, job-work-dir. Note that initializeJob
-        // should be the last call after every other directory/file to be
-        // directly under the job directory is created.
-        JobInitializationContext context = new JobInitializationContext();
-        context.jobid = jobId;
-        context.user = t.getUser();
-        context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR));
-        taskController.initializeJob(context);
-        
+        Path localJobConfPath = initializeJob(t, rjob);
+        JobConf localJobConf = new JobConf(localJobConfPath);
+        //to be doubly sure, overwrite the user in the config with the one the TT 
+        //thinks it is
+        localJobConf.setUser(t.getUser());
+        //also reset the #tasks per jvm
+        resetNumTasksPerJvm(localJobConf);
+        //set the base jobconf path in rjob; all tasks will use
+        //this as the base path when they run
+        rjob.localizedJobConf = localJobConfPath;
         rjob.jobConf = localJobConf;  
         rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
                              localJobConf.getKeepFailedTaskFiles());
@@ -930,35 +964,31 @@ public class TaskTracker implements MRCo
    * Localize the job on this tasktracker. Specifically
    * <ul>
    * <li>Cleanup and create job directories on all disks</li>
+   * <li>Download the credentials file</li>
    * <li>Download the job config file job.xml from the FS</li>
-   * <li>Create the job work directory and set {@link TaskTracker#JOB_LOCAL_DIR}
-   * in the configuration.
-   * <li>Download the job jar file job.jar from the FS, unjar it and set jar
-   * file in the configuration.</li>
+   * <li>Invokes the {@link TaskController} to do the rest of the job 
+   * initialization</li>
    * </ul>
    *
    * @param t task whose job has to be localized on this TT
-   * @return the modified job configuration to be used for all the tasks of this
-   *         job as a starting point.
+   * @param rjob the {@link RunningJob}
+   * @return the path to the job configuration to be used for all the tasks
+   *         of this job as a starting point.
    * @throws IOException
    */
-  JobConf localizeJobFiles(Task t, RunningJob rjob)
-      throws IOException, InterruptedException {
-    JobID jobId = t.getJobID();
+  Path initializeJob(final Task t, RunningJob rjob)
+  throws IOException, InterruptedException {
+    final JobID jobId = t.getJobID();
+
+    final Path jobFile = new Path(t.getJobFile());
+    final String userName = t.getUser();
+    final Configuration conf = getJobConf();
 
-    Path jobFile = new Path(t.getJobFile());
-    String userName = t.getUser();
-    JobConf userConf = new JobConf(getJobConf());
-    
-    // Initialize the job directories first
-    FileSystem localFs = FileSystem.getLocal(fConf);
-    getLocalizer().initializeJobDirs(userName, jobId);
-    
     // save local copy of JobToken file
-    String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
+    final String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
     rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
-    
-    Credentials ts = TokenCache.loadTokens(localJobTokenFile, fConf);
+
+    Credentials ts = TokenCache.loadTokens(localJobTokenFile, conf);
     Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
     if (jt != null) { //could be null in the case of some unit tests
       getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
@@ -967,107 +997,108 @@ public class TaskTracker implements MRCo
       rjob.ugi.addToken(token);
     }
 
-    FileSystem userFs = getFS(jobFile, jobId, userConf);
+    FileSystem userFs = getFS(jobFile, jobId, conf);
 
     // Download the job.xml for this job from the system FS
-    Path localJobFile =
-        localizeJobConfFile(new Path(t.getJobFile()), userName, userFs, jobId);
-
+    final Path localJobFile =
+      localizeJobConfFile(new Path(t.getJobFile()), userName, userFs, jobId);
     JobConf localJobConf = new JobConf(localJobFile);
-    //WE WILL TRUST THE USERNAME THAT WE GOT FROM THE JOBTRACKER
-    //AS PART OF THE TASK OBJECT
-    localJobConf.setUser(userName);
-    
-    // set the location of the token file into jobConf to transfer 
-    // the name to TaskRunner
-    localJobConf.set(TokenCache.JOB_TOKENS_FILENAME, localJobTokenFile);
-    // create the 'job-work' directory: job-specific shared directory for use as
-    // scratch space by all tasks of the same job running on this TaskTracker.
-    Path workDir =
-        lDirAlloc.getLocalPathForWrite(getJobWorkDir(userName,
-            jobId.toString()), fConf);
-    if (!localFs.mkdirs(workDir)) {
-      throw new IOException("Mkdirs failed to create "
-          + workDir.toString());
-    }
-    System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath());
-    localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
-
-    // Download the job.jar for this job from the system FS
-    localizeJobJarFile(userName, jobId, userFs, localJobConf);
-
-    return localJobConf;
-  }
-
-  // Create job userlog dir.
-  // Create job acls file in job log dir, if needed.
-  void initializeJobLogDir(JobID jobId, JobConf localJobConf)
+
+    // Setup the public distributed cache
+    TaskDistributedCacheManager taskDistributedCacheManager =
+      getTrackerDistributedCacheManager()
+     .newTaskDistributedCacheManager(jobId, localJobConf);
+    rjob.distCacheMgr = taskDistributedCacheManager;
+    taskDistributedCacheManager.setupCache(TaskTracker.getPublicDistributedCacheDir(),
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+
+    // Set some config values
+    localJobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
+        getJobConf().get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+    if (conf.get("slave.host.name") != null) {
+      localJobConf.set("slave.host.name", conf.get("slave.host.name"));
+    }
+    resetNumTasksPerJvm(localJobConf);
+    localJobConf.setUser(t.getUser());
+
+    // write back the config (this config will have the updates that the
+    // distributed cache manager makes as well)
+    JobLocalizer.writeLocalJobFile(localJobFile, localJobConf);
+
+    /**
+      * Now initialize the job via task-controller to do the rest of the
+      * job-init. Do this within a doAs since the distributed cache is also set
+      * up in {@link TaskController#initializeJob(String, JobID, Path, Path)}
+      * To support potential authenticated HDFS accesses, we need the tokens
+      */
+    rjob.ugi.doAs(new PrivilegedExceptionAction<Object>() {
+      public Object run() throws IOException {
+        try {
+          taskController.initializeJob(t.getUser(), jobId.toString(), 
+              new Path(localJobTokenFile), localJobFile, TaskTracker.this);
+        } catch (IOException e) {
+          try {
+            // called holding lock on RunningJob
+            removeJobFiles(t.getUser(), jobId);
+          } catch (IOException e2) {
+            LOG.warn("Failed to add " + jobId + " to cleanup queue", e2);
+          }
+          throw e;
+        }
+        return null;
+      }
+    });
+    //search for the conf that the initializeJob created
+    //need to look up certain configs from this conf, like
+    //the distributed cache, profiling, etc. ones
+    Path initializedConf = lDirAlloc.getLocalPathToRead(getLocalJobConfFile(
+           userName, jobId.toString()), getJobConf());
+    return initializedConf;
+  }
+  
+  /** If certain configs are enabled, the jvm-reuse should be disabled
+   * @param localJobConf
+   */
+  static void resetNumTasksPerJvm(JobConf localJobConf) {
+    boolean debugEnabled = false;
+    if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+      return;
+    }
+    if (localJobConf.getMapDebugScript() != null || 
+        localJobConf.getReduceDebugScript() != null) {
+      debugEnabled = true;
+    }
+    String keepPattern = localJobConf.getKeepTaskFilesPattern();
+    
+    if (debugEnabled || localJobConf.getProfileEnabled() ||
+        keepPattern != null || localJobConf.getKeepFailedTaskFiles()) {
+      //disable jvm reuse
+      localJobConf.setNumTasksToExecutePerJvm(1);
+    }
+  }
+
+  // Remove the log dir from the tasklog cleanup thread
+  void saveLogDir(JobID jobId, JobConf localJobConf)
       throws IOException {
     // remove it from tasklog cleanup thread first,
     // it might be added there because of tasktracker reinit or restart
     JobStartedEvent jse = new JobStartedEvent(jobId);
     getUserLogManager().addLogEvent(jse);
-    localizer.initializeJobLogDir(jobId);
-
-    if (areACLsEnabled()) {
-      // Create job-acls.xml file in job userlog dir and write the needed
-      // info for authorization of users for viewing task logs of this job.
-      writeJobACLs(localJobConf, TaskLog.getJobDir(jobId));
-    }
-  }
-
-  /**
-   *  Creates job-acls.xml under the given directory logDir and writes
-   *  job-view-acl, queue-admins-acl, jobOwner name and queue name into this
-   *  file.
-   *  queue name is the queue to which the job was submitted to.
-   *  queue-admins-acl is the queue admins ACL of the queue to which this
-   *  job was submitted to.
-   * @param conf   job configuration
-   * @param logDir job userlog dir
-   * @throws IOException
-   */
-  private static void writeJobACLs(JobConf conf, File logDir) throws IOException {
-    File aclFile = new File(logDir, jobACLsFile);
-    JobConf aclConf = new JobConf(false);
-
-    // set the job view acl in aclConf
-    String jobViewACL = conf.get(JobContext.JOB_ACL_VIEW_JOB, " ");
-    aclConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACL);
-
-    // set the job queue name in aclConf
-    String queue = conf.getQueueName();
-    aclConf.setQueueName(queue);
-
-    // set the queue admins acl in aclConf
-    String qACLName = QueueManager.toFullPropertyName(queue,
-        QueueACL.ADMINISTER_JOBS.getAclName());
-    String queueAdminsACL = conf.get(qACLName, " ");
-    aclConf.set(qACLName, queueAdminsACL);
-
-    // set jobOwner as user.name in aclConf
-    String jobOwner = conf.getUser();
-    aclConf.set("user.name", jobOwner);
-
-    FileOutputStream out = new FileOutputStream(aclFile);
-    try {
-      aclConf.writeXml(out);
-    } finally {
-      out.close();
-    }
-    Localizer.PermissionsHandler.setPermissions(aclFile,
-        Localizer.PermissionsHandler.sevenZeroZero);
   }
 
+  
   /**
    * Download the job configuration file from the FS.
    *
-   * @param t Task whose job file has to be downloaded
-   * @param jobId jobid of the task
+   * @param jobFile the original location of the configuration file
+   * @param user the user in question
+   * @param userFs the FileSystem created on behalf of the user
+   * @param jobId jobid in question
    * @return the local file system path of the downloaded file.
    * @throws IOException
    */
-  private Path localizeJobConfFile(Path jobFile, String user, FileSystem userFs, JobID jobId)
+  private Path localizeJobConfFile(Path jobFile, String user, 
+      FileSystem userFs, JobID jobId)
   throws IOException {
     // Get sizes of JobFile and JarFile
     // sizes are -1 if they are not present.
@@ -1080,7 +1111,7 @@ public class TaskTracker implements MRCo
       jobFileSize = -1;
     }
     Path localJobFile =
-      lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(user,
+      lDirAlloc.getLocalPathForWrite(getPrivateDirJobConfFile(user,
           jobId.toString()), jobFileSize, fConf);
 
     // Download job.xml
@@ -1088,58 +1119,16 @@ public class TaskTracker implements MRCo
     return localJobFile;
   }
 
-  /**
-   * Download the job jar file from FS to the local file system and unjar it.
-   * Set the local jar file in the passed configuration.
-   *
-   * @param jobId
-   * @param userFs
-   * @param localJobConf
-   * @throws IOException
-   */
-  private void localizeJobJarFile(String user, JobID jobId, FileSystem userFs,
-      JobConf localJobConf)
-  throws IOException {
-    // copy Jar file to the local FS and unjar it.
-    String jarFile = localJobConf.getJar();
-    FileStatus status = null;
-    long jarFileSize = -1;
-    if (jarFile != null) {
-      Path jarFilePath = new Path(jarFile);
-      try {
-        status = userFs.getFileStatus(jarFilePath);
-        jarFileSize = status.getLen();
-      } catch (FileNotFoundException fe) {
-        jarFileSize = -1;
-      }
-      // Here we check for five times the size of jarFileSize to accommodate for
-      // unjarring the jar file in the jars directory
-      Path localJarFile =
-        lDirAlloc.getLocalPathForWrite(
-            getJobJarFile(user, jobId.toString()), 5 * jarFileSize, fConf);
-
-      //Download job.jar
-      userFs.copyToLocalFile(jarFilePath, localJarFile);
-
-      localJobConf.setJar(localJarFile.toString());
-
-      // Also un-jar the job.jar files. We un-jar it so that classes inside
-      // sub-directories, for e.g., lib/, classes/ are available on class-path
-      RunJar.unJar(new File(localJarFile.toString()), new File(localJarFile
-          .getParent().toString()));
-    }
-  }
-
   private void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
-      UserGroupInformation ugi) throws IOException {
+                                RunningJob rjob) throws IOException {
     synchronized (tip) {
       tip.setJobConf(jobConf);
-      tip.setUGI(ugi);
-      tip.launchTask();
+      tip.setUGI(rjob.ugi);
+      tip.launchTask(rjob);
     }
   }
     
-  public synchronized void shutdown() throws IOException {
+  public synchronized void shutdown() throws IOException, InterruptedException {
     shuttingDown = true;
     close();
     if (this.server != null) {
@@ -1156,8 +1145,9 @@ public class TaskTracker implements MRCo
    * any running tasks or threads, and cleanup disk space.  A new TaskTracker
    * within the same process space might be restarted, so everything must be
    * clean.
+   * @throws InterruptedException 
    */
-  public synchronized void close() throws IOException {
+  public synchronized void close() throws IOException, InterruptedException {
     //
     // Kill running tasks.  Do this in a 2nd vector, called 'tasksToClose',
     // because calling jobHasFinished() may result in an edit to 'tasks'.
@@ -1246,8 +1236,15 @@ public class TaskTracker implements MRCo
     // objects
     FileSystem local = FileSystem.getLocal(conf);
     this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
+    Class<? extends TaskController> taskControllerClass = 
+      conf.getClass("mapred.task.tracker.task-controller", 
+                     DefaultTaskController.class, TaskController.class);
+   taskController = 
+     (TaskController) ReflectionUtils.newInstance(taskControllerClass, conf);
+   taskController.setup(localDirAllocator);
+
     // create user log manager
-    setUserLogManager(new UserLogManager(conf));
+    setUserLogManager(new UserLogManager(conf, taskController));
     SecurityUtil.login(originalConf, TT_KEYTAB_FILE, TT_USER_NAME);
 
     initialize();
@@ -1688,70 +1685,6 @@ public class TaskTracker implements MRCo
   }
 
   /**
-   * Builds list of PathDeletionContext objects for the given paths
-   */
-  private static PathDeletionContext[] buildPathDeletionContexts(FileSystem fs,
-      Path[] paths) {
-    int i = 0;
-    PathDeletionContext[] contexts = new PathDeletionContext[paths.length];
-
-    for (Path p : paths) {
-      contexts[i++] = new PathDeletionContext(fs, p.toUri().getPath());
-    }
-    return contexts;
-  }
-
-  /**
-   * Builds list of {@link TaskControllerJobPathDeletionContext} objects for a 
-   * job each pointing to the job's jobLocalDir.
-   * @param fs    : FileSystem in which the dirs to be deleted
-   * @param paths : mapred-local-dirs
-   * @param id    : {@link JobID} of the job for which the local-dir needs to 
-   *                be cleaned up.
-   * @param user  : Job owner's username
-   * @param taskController : the task-controller to be used for deletion of
-   *                         jobLocalDir
-   */
-  static PathDeletionContext[] buildTaskControllerJobPathDeletionContexts(
-      FileSystem fs, Path[] paths, JobID id, String user,
-      TaskController taskController)
-      throws IOException {
-    int i = 0;
-    PathDeletionContext[] contexts =
-                          new TaskControllerPathDeletionContext[paths.length];
-
-    for (Path p : paths) {
-      contexts[i++] = new TaskControllerJobPathDeletionContext(fs, p, id, user,
-                                                               taskController);
-    }
-    return contexts;
-  } 
-  
-  /**
-   * Builds list of TaskControllerTaskPathDeletionContext objects for a task
-   * @param fs    : FileSystem in which the dirs to be deleted
-   * @param paths : mapred-local-dirs
-   * @param task  : the task whose taskDir or taskWorkDir is going to be deleted
-   * @param isWorkDir : the dir to be deleted is workDir or taskDir
-   * @param taskController : the task-controller to be used for deletion of
-   *                         taskDir or taskWorkDir
-   */
-  static PathDeletionContext[] buildTaskControllerTaskPathDeletionContexts(
-      FileSystem fs, Path[] paths, Task task, boolean isWorkDir,
-      TaskController taskController)
-      throws IOException {
-    int i = 0;
-    PathDeletionContext[] contexts =
-                          new TaskControllerPathDeletionContext[paths.length];
-
-    for (Path p : paths) {
-      contexts[i++] = new TaskControllerTaskPathDeletionContext(fs, p, task,
-                          isWorkDir, taskController);
-    }
-    return contexts;
-  }
-
-  /**
    * The task tracker is done with this job, so we need to clean up.
    * @param action The action with the job
    * @throws IOException
@@ -1778,7 +1711,7 @@ public class TaskTracker implements MRCo
         }
         // Delete the job directory for this  
         // task if the job is done/failed
-        if (!rjob.keepJobFiles) {
+        if (!rjob.keepJobFiles && rjob.localized) {
           removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID());
         }
         // add job to user log manager
@@ -1812,12 +1745,21 @@ public class TaskTracker implements MRCo
    * @param rjob
    * @throws IOException
    */
-  void removeJobFiles(String user, JobID jobId)
-  throws IOException {
-    PathDeletionContext[] contexts = 
-      buildTaskControllerJobPathDeletionContexts(localFs, 
-          getLocalFiles(fConf, ""), jobId, user, taskController);
-    directoryCleanupThread.addToQueue(contexts);
+  void removeJobFiles(String user, JobID jobId) throws IOException {
+    String userDir = getUserDir(user);
+    String jobDir = getLocalJobDir(user, jobId.toString());
+    PathDeletionContext jobCleanup = 
+      new TaskController.DeletionContext(getTaskController(), false, user, 
+                                         jobDir.substring(userDir.length()));
+    directoryCleanupThread.addToQueue(jobCleanup);
+    
+    for (String str : localdirs) {
+      Path ttPrivateJobDir = FileSystem.getLocal(fConf).makeQualified(
+        new Path(str, TaskTracker.getPrivateDirForJob(user, jobId.toString())));
+      PathDeletionContext ttPrivateJobCleanup =
+        new CleanupQueue.PathDeletionContext(ttPrivateJobDir, fConf);
+      directoryCleanupThread.addToQueue(ttPrivateJobCleanup);
+    }
   }
 
   /**
@@ -2113,12 +2055,14 @@ public class TaskTracker implements MRCo
    * Start a new task.
    * All exceptions are handled locally, so that we don't mess up the
    * task tracker.
+   * @throws InterruptedException 
    */
-  void startNewTask(TaskInProgress tip) {
+  void startNewTask(TaskInProgress tip) throws InterruptedException {
     try {
       RunningJob rjob = localizeJob(tip);
+      tip.getTask().setJobFile(rjob.localizedJobConf.toString());
       // Localization is done. Neither rjob.jobConf nor rjob.ugi can be null
-      launchTaskForJob(tip, new JobConf(rjob.jobConf), rjob.ugi); 
+      launchTaskForJob(tip, new JobConf(rjob.jobConf), rjob); 
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                     ":\n" + StringUtils.stringifyException(e));
@@ -2128,8 +2072,9 @@ public class TaskTracker implements MRCo
         tip.kill(true);
         tip.cleanup(true);
       } catch (IOException ie2) {
-        LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n" +
-                 StringUtils.stringifyException(ie2));          
+        LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
+      } catch (InterruptedException ie2) {
+        LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
       }
         
       // Careful! 
@@ -2238,7 +2183,7 @@ public class TaskTracker implements MRCo
     private TaskRunner runner;
     volatile boolean done = false;
     volatile boolean wasKilled = false;
-    private JobConf defaultJobConf;
+    private JobConf ttConf;
     private JobConf localJobConf;
     private boolean keepFailedTaskFiles;
     private boolean alwaysKeepTaskFiles;
@@ -2270,7 +2215,7 @@ public class TaskTracker implements MRCo
       this.task = task;
       this.launcher = launcher;
       this.lastProgressReport = System.currentTimeMillis();
-      this.defaultJobConf = conf;
+      this.ttConf = conf;
       localJobConf = null;
       taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(), 
                                                0.0f, 
@@ -2289,69 +2234,10 @@ public class TaskTracker implements MRCo
         
     void localizeTask(Task task) throws IOException{
 
-      FileSystem localFs = FileSystem.getLocal(fConf);
-
-      // create taskDirs on all the disks.
-      getLocalizer().initializeAttemptDirs(task.getUser(),
-          task.getJobID().toString(), task.getTaskID().toString(),
-          task.isTaskCleanupTask());
-
-      // create the working-directory of the task 
-      Path cwd =
-          lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getUser(), task
-              .getJobID().toString(), task.getTaskID().toString(), task
-              .isTaskCleanupTask()), defaultJobConf);
-      if (!localFs.mkdirs(cwd)) {
-        throw new IOException("Mkdirs failed to create " 
-                    + cwd.toString());
-      }
-
-      localJobConf.set("mapred.local.dir",
-                       fConf.get("mapred.local.dir"));
-
-      if (fConf.get("slave.host.name") != null) {
-        localJobConf.set("slave.host.name",
-                         fConf.get("slave.host.name"));
-      }
-            
-      keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
-
       // Do the task-type specific localization
+//TODO: are these calls really required
       task.localizeConfiguration(localJobConf);
       
-      List<String[]> staticResolutions = NetUtils.getAllStaticResolutions();
-      if (staticResolutions != null && staticResolutions.size() > 0) {
-        StringBuffer str = new StringBuffer();
-
-        for (int i = 0; i < staticResolutions.size(); i++) {
-          String[] hostToResolved = staticResolutions.get(i);
-          str.append(hostToResolved[0]+"="+hostToResolved[1]);
-          if (i != staticResolutions.size() - 1) {
-            str.append(',');
-          }
-        }
-        localJobConf.set("hadoop.net.static.resolutions", str.toString());
-      }
-      if (task.isMapTask()) {
-        debugCommand = localJobConf.getMapDebugScript();
-      } else {
-        debugCommand = localJobConf.getReduceDebugScript();
-      }
-      String keepPattern = localJobConf.getKeepTaskFilesPattern();
-      if (keepPattern != null) {
-        alwaysKeepTaskFiles = 
-          Pattern.matches(keepPattern, task.getTaskID().toString());
-      } else {
-        alwaysKeepTaskFiles = false;
-      }
-      if (debugCommand != null || localJobConf.getProfileEnabled() ||
-          alwaysKeepTaskFiles || keepFailedTaskFiles) {
-        //disable jvm reuse
-        localJobConf.setNumTasksToExecutePerJvm(1);
-      }
-      if (isTaskMemoryManagerEnabled()) {
-        localJobConf.setBoolean("task.memory.mgmt.enabled", true);
-      }
       task.setConf(localJobConf);
     }
         
@@ -2374,6 +2260,18 @@ public class TaskTracker implements MRCo
       keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
       taskTimeout = localJobConf.getLong("mapred.task.timeout", 
                                          10 * 60 * 1000);
+      if (task.isMapTask()) {
+        debugCommand = localJobConf.getMapDebugScript();
+      } else {
+        debugCommand = localJobConf.getReduceDebugScript();
+      }
+      String keepPattern = localJobConf.getKeepTaskFilesPattern();
+      if (keepPattern != null) {
+        alwaysKeepTaskFiles = 
+          Pattern.matches(keepPattern, task.getTaskID().toString());
+      } else {
+        alwaysKeepTaskFiles = false;
+      }
     }
         
     public synchronized JobConf getJobConf() {
@@ -2394,7 +2292,7 @@ public class TaskTracker implements MRCo
     /**
      * Kick off the task execution
      */
-    public synchronized void launchTask() throws IOException {
+    public synchronized void launchTask(RunningJob rjob) throws IOException {
       if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
           this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
           this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
@@ -2402,7 +2300,7 @@ public class TaskTracker implements MRCo
         if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
           this.taskStatus.setRunState(TaskStatus.State.RUNNING);
         }
-        setTaskRunner(task.createRunner(TaskTracker.this, this));
+        setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));
         this.runner.start();
         this.taskStatus.setStartTime(System.currentTimeMillis());
       } else {
@@ -2466,8 +2364,10 @@ public class TaskTracker implements MRCo
               "Groups=" + taskCounters.getGroupNames().size() + " Limit=" +
               Counters.MAX_GROUP_LIMIT);
           kill(true);
-        } catch(IOException ie) {
-          LOG.error("Error killing task " + task.getTaskID(), ie);
+        } catch (IOException e) {
+          LOG.error("Error killing task " + task.getTaskID(), e);
+        } catch (InterruptedException e) {
+          LOG.error("Error killing task " + task.getTaskID(), e);
         }
       }
       
@@ -2828,7 +2728,12 @@ public class TaskTracker implements MRCo
             getRunState() == TaskStatus.State.UNASSIGNED ||
             getRunState() == TaskStatus.State.COMMIT_PENDING ||
             isCleaningup()) {
-          kill(wasFailure);
+          try {
+            kill(wasFailure);
+          } catch (InterruptedException e) {
+            throw new IOException("Interrupted while killing " +
+                getTask().getTaskID(), e);
+          }
         }
       }
       
@@ -2839,8 +2744,10 @@ public class TaskTracker implements MRCo
     /**
      * Something went wrong and the task must be killed.
      * @param wasFailure was it a failure (versus a kill request)?
+     * @throws InterruptedException 
      */
-    public synchronized void kill(boolean wasFailure) throws IOException {
+    public synchronized void kill(boolean wasFailure
+                                  ) throws IOException, InterruptedException {
       if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
           taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING ||
           isCleaningup()) {
@@ -2942,7 +2849,7 @@ public class TaskTracker implements MRCo
           return;
         }
         try {
-          removeTaskFiles(needCleanup, taskId);
+          removeTaskFiles(needCleanup);
         } catch (Throwable ie) {
           LOG.info("Error cleaning up task runner: "
               + StringUtils.stringifyException(ie));
@@ -2954,47 +2861,27 @@ public class TaskTracker implements MRCo
      * Some or all of the files from this task are no longer required. Remove
      * them via CleanupQueue.
      * 
-     * @param needCleanup
+     * @param removeOutputs remove outputs as well as output
      * @param taskId
      * @throws IOException 
      */
-    void removeTaskFiles(boolean needCleanup, TaskAttemptID taskId)
-        throws IOException {
-      if (needCleanup) {
-        if (runner != null) {
-          // cleans up the output directory of the task (where map outputs
-          // and reduce inputs get stored)
-          runner.close();
-        }
-
-        if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-          // No jvm reuse, remove everything
-          PathDeletionContext[] contexts =
-            buildTaskControllerTaskPathDeletionContexts(localFs,
-                getLocalFiles(fConf, ""), task, false/* not workDir */,
-                taskController);
-          directoryCleanupThread.addToQueue(contexts);
+    void removeTaskFiles(boolean removeOutputs) throws IOException {
+      if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+        String user = ugi.getShortUserName();
+        int userDirLen = TaskTracker.getUserDir(user).length();
+        String jobId = task.getJobID().toString();
+        String taskId = task.getTaskID().toString();
+        boolean cleanup = task.isTaskCleanupTask();
+        String taskDir;
+        if (!removeOutputs) {
+          taskDir = TaskTracker.getTaskWorkDir(user, jobId, taskId, cleanup);
         } else {
-          // Jvm reuse. We don't delete the workdir since some other task
-          // (running in the same JVM) might be using the dir. The JVM
-          // running the tasks would clean the workdir per a task in the
-          // task process itself.
-          String localTaskDir =
-            getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId
-                .toString(), task.isTaskCleanupTask());
-          PathDeletionContext[] contexts = buildPathDeletionContexts(
-              localFs, getLocalFiles(defaultJobConf, localTaskDir +
-                         Path.SEPARATOR + TaskTracker.JOBFILE));
-          directoryCleanupThread.addToQueue(contexts);
-        }
-      } else {
-        if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-          PathDeletionContext[] contexts =
-            buildTaskControllerTaskPathDeletionContexts(localFs,
-              getLocalFiles(fConf, ""), task, true /* workDir */,
-              taskController);
-          directoryCleanupThread.addToQueue(contexts);
+          taskDir = TaskTracker.getLocalTaskDir(user, jobId, taskId, cleanup);
         }
+        PathDeletionContext item =
+          new TaskController.DeletionContext(taskController, false, user,
+                                             taskDir.substring(userDirLen));          
+        directoryCleanupThread.addToQueue(item);
       }
     }
         
@@ -3033,7 +2920,11 @@ public class TaskTracker implements MRCo
     if (rjob == null) { //kill the JVM since the job is dead
       LOG.info("Killing JVM " + jvmId + " since job " + jvmId.getJobId() +
                " is dead");
-      jvmManager.killJvm(jvmId);
+      try {
+        jvmManager.killJvm(jvmId);
+      } catch (InterruptedException e) {
+        LOG.warn("Failed to kill " + jvmId, e);
+      }
       return new JvmTask(null, true);
     }
     TaskInProgress tip = jvmManager.getTaskForJvm(jvmId);
@@ -3226,12 +3117,15 @@ public class TaskTracker implements MRCo
   static class RunningJob{
     private JobID jobid; 
     private JobConf jobConf;
+    private Path localizedJobConf;
     // keep this for later use
     volatile Set<TaskInProgress> tasks;
     boolean localized;
     boolean keepJobFiles;
     UserGroupInformation ugi;
     FetchStatus f;
+    TaskDistributedCacheManager distCacheMgr;
+    
     RunningJob(JobID jobid) {
       this.jobid = jobid;
       localized = false;
@@ -3850,7 +3744,7 @@ public class TaskTracker implements MRCo
       jobTokenSize = status.getLen();
       
       Path localJobTokenFile =
-          lDirAlloc.getLocalPathForWrite(getLocalJobTokenFile(user, 
+          lDirAlloc.getLocalPathForWrite(getPrivateDirJobTokenFile(user, 
               jobId.toString()), jobTokenSize, fConf);
     
       String localJobTokenFileStr = localJobTokenFile.toUri().getPath();
@@ -3936,4 +3830,13 @@ public class TaskTracker implements MRCo
     return map;
   }
   // End MXBean implemenation
+
+  @Override
+  public void 
+  updatePrivateDistributedCacheSizes(org.apache.hadoop.mapreduce.JobID jobId,
+                                     long[] sizes
+                                     ) throws IOException {
+    distributedCacheManager.setArchiveSizes(jobId, sizes);
+  }
+
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Mar  4 04:43:33 2011
@@ -157,4 +157,13 @@ public interface TaskUmbilicalProtocol e
                                                        TaskAttemptID id) 
   throws IOException;
 
+  /**
+   * The job initializer needs to report the sizes of the archive
+   * objects in the private distributed cache.
+   * @param jobId the job to update
+   * @param sizes the array of sizes that were computed
+   * @throws IOException
+   */
+  void updatePrivateDistributedCacheSizes(org.apache.hadoop.mapreduce.JobID jobId,
+                                          long[] sizes) throws IOException;
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/UserLogCleaner.java Fri Mar  4 04:43:33 2011
@@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFac
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
@@ -118,7 +119,7 @@ public class UserLogCleaner extends Thre
   }
 
   public void deleteJobLogs(JobID jobid) throws IOException {
-    deleteLogPath(TaskLog.getJobDir(jobid).getAbsolutePath());
+    deleteLogPath(jobid.toString());
   }
 
   /**
@@ -134,26 +135,22 @@ public class UserLogCleaner extends Thre
   public void clearOldUserLogs(Configuration conf) throws IOException {
     File userLogDir = TaskLog.getUserLogDir();
     if (userLogDir.exists()) {
-      String[] logDirs = userLogDir.list();
-      if (logDirs.length > 0) {
+      long now = clock.getTime();
+      for(String logDir: userLogDir.list()) {
         // add all the log dirs to taskLogsMnonitor.
-        long now = clock.getTime();
-        for (String logDir : logDirs) {
-          JobID jobid = null;
-          try {
-            jobid = JobID.forName(logDir);
-          } catch (IllegalArgumentException ie) {
-            // if the directory is not a jobid, delete it immediately
-            deleteLogPath(new File(userLogDir, logDir).getAbsolutePath());
-            continue;
-          }
-          // add the job log directory for deletion with default retain hours,
-          // if it is not already added
-          if (!completedJobs.containsKey(jobid)) {
-            JobCompletedEvent jce = new JobCompletedEvent(jobid, now,
-                getUserlogRetainHours(conf));
-            userLogManager.addLogEvent(jce);
-          }
+        JobID jobid = null;
+        try {
+          jobid = JobID.forName(logDir);
+        } catch (IllegalArgumentException ie) {
+          deleteLogPath(logDir);
+          continue;
+        }
+        // add the job log directory for deletion with default retain hours,
+        // if it is not already added
+        if (!completedJobs.containsKey(jobid)) {
+          JobCompletedEvent jce = 
+            new JobCompletedEvent(jobid, now,getUserlogRetainHours(conf));
+          userLogManager.addLogEvent(jce);
         }
       }
     }
@@ -208,7 +205,11 @@ public class UserLogCleaner extends Thre
    */
   private void deleteLogPath(String logPath) throws IOException {
     LOG.info("Deleting user log path " + logPath);
-    PathDeletionContext context = new PathDeletionContext(localFs, logPath);
-    cleanupQueue.addToQueue(context);
+    String logRoot = TaskLog.getUserLogDir().toString();
+    String user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner();
+    TaskController controller = userLogManager.getTaskController();
+    PathDeletionContext item = 
+      new TaskController.DeletionContext(controller, true, user, logPath);
+    cleanupQueue.addToQueue(item);
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Mar  4 04:43:33 2011
@@ -156,7 +156,7 @@ public class TokenCache {
    * @throws IOException
    */
   //@InterfaceAudience.Private
-  public static Credentials loadTokens(String jobTokenFile, JobConf conf) 
+  public static Credentials loadTokens(String jobTokenFile, Configuration conf) 
   throws IOException {
     Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
     

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Fri Mar  4 04:43:33 2011
@@ -31,7 +31,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.TaskTracker;
-import org.apache.hadoop.mapred.TaskController.InitializationContext;
 import org.apache.hadoop.mapreduce.JobID;
 
 /**
@@ -44,19 +43,16 @@ public class Localizer {
 
   private FileSystem fs;
   private String[] localDirs;
-  private TaskController taskController;
 
   /**
    * Create a Localizer instance
    * 
    * @param fileSys
    * @param lDirs
-   * @param tc
    */
-  public Localizer(FileSystem fileSys, String[] lDirs, TaskController tc) {
+  public Localizer(FileSystem fileSys, String[] lDirs) {
     fs = fileSys;
     localDirs = lDirs;
-    taskController = tc;
   }
 
   /**
@@ -264,13 +260,6 @@ public class Localizer {
                 + user);
       }
 
-      // Now, run the task-controller specific code to initialize the
-      // user-directories.
-      InitializationContext context = new InitializationContext();
-      context.user = user;
-      context.workDir = null;
-      taskController.initializeUser(context);
-
       // Localization of the user is done
       localizedUser.set(true);
     }
@@ -283,7 +272,7 @@ public class Localizer {
    * <br>
    * Here, we set 700 permissions on the job directories created on all disks.
    * This we do so as to avoid any misuse by other users till the time
-   * {@link TaskController#initializeJob(JobInitializationContext)} is run at a
+   * {@link TaskController#initializeJob} is run at a
    * later time to set proper private permissions on the job directories. <br>
    * 
    * @param user
@@ -331,16 +320,15 @@ public class Localizer {
    * @param user
    * @param jobId
    * @param attemptId
-   * @param isCleanupAttempt
    * @throws IOException
    */
   public void initializeAttemptDirs(String user, String jobId,
-      String attemptId, boolean isCleanupAttempt)
+      String attemptId)
       throws IOException {
 
     boolean initStatus = false;
     String attemptDirPath =
-        TaskTracker.getLocalTaskDir(user, jobId, attemptId, isCleanupAttempt);
+        TaskTracker.getLocalTaskDir(user, jobId, attemptId);
 
     for (String localDir : localDirs) {
       Path localAttemptDir = new Path(localDir, attemptDirPath);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/userlogs/UserLogManager.java Fri Mar  4 04:43:33 2011
@@ -24,9 +24,12 @@ import java.util.concurrent.LinkedBlocki
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.DefaultTaskController;
+import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapred.TaskLogsTruncater;
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapred.UserLogCleaner;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * This manages user logs on the {@link TaskTracker}.
@@ -37,6 +40,7 @@ public class UserLogManager {
     new LinkedBlockingQueue<UserLogEvent>();
   private TaskLogsTruncater taskLogsTruncater;
   private UserLogCleaner userLogCleaner;
+  private final TaskController taskController;
 
   private Thread monitorLogEvents = new Thread() {
     @Override
@@ -56,18 +60,50 @@ public class UserLogManager {
    * 
    * It should be explicitly started using {@link #start()} to start functioning
    * 
-   * @param conf
-   *          The {@link Configuration}
+   * @param conf The {@link Configuration}
+   * @param taskController The task controller to delete the log files
+   * 
+   * @throws IOException
+   */
+  public UserLogManager(Configuration conf,
+                        TaskController taskController) throws IOException {
+    this.taskController = taskController;
+    setFields(conf);
+  }
+  
+  /**
+   * Create the user log manager to manage user logs on {@link TaskTracker}.
+   * This constructor is there mainly for unit tests.
    * 
+   * @param conf The {@link Configuration}
+   *
    * @throws IOException
    */
   public UserLogManager(Configuration conf) throws IOException {
+    Class<? extends TaskController> taskControllerClass = 
+      conf.getClass("mapred.task.tracker.task-controller", 
+                     DefaultTaskController.class, TaskController.class);
+    TaskController taskController = 
+     (TaskController) ReflectionUtils.newInstance(taskControllerClass, conf);
+    this.taskController = taskController;
+    setFields(conf);
+  }
+  
+  private void setFields(Configuration conf) throws IOException {
     taskLogsTruncater = new TaskLogsTruncater(conf);
     userLogCleaner = new UserLogCleaner(this, conf);
     monitorLogEvents.setDaemon(true);
   }
 
   /**
+   * Get the taskController for deleting logs.
+   * @return the TaskController
+   */
+  public TaskController getTaskController() {
+    return taskController;
+  }
+
+  /**
    * Starts managing the logs
    */
   public void start() {
@@ -101,13 +137,12 @@ public class UserLogManager {
    *          TT's conf
    * @throws IOException
    */
-  public void clearOldUserLogs(Configuration conf)
-      throws IOException {
+  public void clearOldUserLogs(Configuration conf) throws IOException {
     userLogCleaner.clearOldUserLogs(conf);
   }
 
   private void doJvmFinishedAction(JvmFinishedEvent event) {
-    taskLogsTruncater.truncateLogs(event.getJvmInfo());
+    // do nothing
   }
 
   private void doJobStartedAction(JobStartedEvent event) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Fri Mar  4 04:43:33 2011
@@ -34,6 +34,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.DefaultTaskController;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.JobLocalizer;
 import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapreduce.Job;
@@ -69,7 +71,9 @@ public class TestTrackerDistributedCache
   private static final int LOCAL_CACHE_SUBDIR = 2;
   protected Configuration conf;
   protected Path firstCacheFile;
+  protected Path firstCacheFilePublic;
   protected Path secondCacheFile;
+  protected Path secondCacheFilePublic;
   private FileSystem fs;
 
   protected LocalDirAllocator localDirAllocator = 
@@ -107,18 +111,22 @@ public class TestTrackerDistributedCache
         taskControllerClass, conf);
 
     // setup permissions for mapred local dir
-    taskController.setup();
+    taskController.setup(localDirAllocator);
 
     // Create the temporary cache files to be used in the tests.
     firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
     secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile");
+    firstCacheFilePublic = new Path(TEST_ROOT_DIR, "firstcachefileOne");
+    secondCacheFilePublic = new Path(TEST_ROOT_DIR, "secondcachefileOne");
+    createPublicTempFile(firstCacheFilePublic);
+    createPublicTempFile(secondCacheFilePublic);
     createPrivateTempFile(firstCacheFile);
     createPrivateTempFile(secondCacheFile);
   }
   
   protected void refreshConf(Configuration conf) throws IOException {
     taskController.setConf(conf);
-    taskController.setup();
+    taskController.setup(localDirAllocator);
   }
 
   /**
@@ -146,6 +154,7 @@ public class TestTrackerDistributedCache
     Configuration subConf = new Configuration(conf);
     String userName = getJobOwnerName();
     subConf.set("user.name", userName);
+    JobID jobid = new JobID("jt",1);
     DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
     DistributedCache.addFileToClassPath(secondCacheFile, subConf, 
                                         FileSystem.get(subConf));
@@ -162,13 +171,16 @@ public class TestTrackerDistributedCache
     TrackerDistributedCacheManager manager = 
       new TrackerDistributedCacheManager(conf, taskController);
     TaskDistributedCacheManager handle =
-      manager.newTaskDistributedCacheManager(subConf);
+      manager.newTaskDistributedCacheManager(jobid, subConf);
     assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
     File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
-    handle.setup(localDirAllocator, workDir, TaskTracker
-        .getPrivateDistributedCacheDir(userName), 
-        TaskTracker.getPublicDistributedCacheDir());
-    // ****** End of imitating TaskRunner code
+    handle.setupCache(TaskTracker.getPublicDistributedCacheDir(), 
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+    JobLocalizer.downloadPrivateCache(subConf);
+    // DOESN'T ACTUALLY HAPPEN IN THE TaskRunner (THIS IS A TODO)
+//    handle.setupPrivateCache(localDirAllocator, TaskTracker
+//        .getPrivateDistributedCacheDir(userName));
+//    // ****** End of imitating TaskRunner code
 
     Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
     assertNotNull(null, localCacheFiles);
@@ -201,14 +213,17 @@ public class TestTrackerDistributedCache
     }
 
     @Override
-    Path localizeCache(Configuration conf, URI cache, long confFileStamp,
-        CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive,
-        boolean isPublic) throws IOException {
-      if (cache.equals(firstCacheFile.toUri())) {
+    Path localizePublicCacheObject(Configuration conf, URI cache, 
+                                   long confFileStamp,
+                                   CacheStatus cacheStatus, 
+                                   FileStatus fileStatus, 
+                                   boolean isArchive) throws IOException {
+      if (cache.equals(firstCacheFilePublic.toUri())) {
         throw new IOException("fake fail");
       }
-      return super.localizeCache(conf, cache, confFileStamp, cacheStatus,
-          fileStatus, isArchive, isPublic);
+      return super.localizePublicCacheObject(conf, cache, confFileStamp, 
+                                             cacheStatus, fileStatus, 
+                                             isArchive);
     }
   }
 
@@ -234,10 +249,10 @@ public class TestTrackerDistributedCache
 
     // Task localizing for first job
     TaskDistributedCacheManager handle = manager
-        .newTaskDistributedCacheManager(conf1);
-    handle.setup(localDirAllocator, workDir, TaskTracker
-          .getPrivateDistributedCacheDir(userName), 
-          TaskTracker.getPublicDistributedCacheDir());
+        .newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
+    handle.setupCache(TaskTracker.getPublicDistributedCacheDir(), 
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+    JobLocalizer.downloadPrivateCache(conf1);
     handle.release();
     for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) {
       assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp, 
@@ -252,7 +267,7 @@ public class TestTrackerDistributedCache
     Configuration conf2 = job2.getConfiguration();
     conf2.set("user.name", userName);
     // add a file that would get failed to localize
-    DistributedCache.addCacheFile(firstCacheFile.toUri(), conf2);
+    DistributedCache.addCacheFile(firstCacheFilePublic.toUri(), conf2);
     // add a file that is already localized by different job
     DistributedCache.addCacheFile(secondCacheFile.toUri(), conf2);
     // add a file that is never localized
@@ -263,12 +278,12 @@ public class TestTrackerDistributedCache
 
     // Task localizing for second job
     // localization for the "firstCacheFile" will fail.
-    handle = manager.newTaskDistributedCacheManager(conf2);
+    handle = manager.newTaskDistributedCacheManager(new JobID("jt", 2), conf2);
     Throwable th = null;
     try {
-      handle.setup(localDirAllocator, workDir, TaskTracker
-          .getPrivateDistributedCacheDir(userName), 
-          TaskTracker.getPublicDistributedCacheDir());
+      handle.setupCache(TaskTracker.getPublicDistributedCacheDir(),
+          TaskTracker.getPrivateDistributedCacheDir(userName));
+      JobLocalizer.downloadPrivateCache(conf2);
     } catch (IOException e) {
       th = e;
       LOG.info("Exception during setup", e);
@@ -316,6 +331,26 @@ public class TestTrackerDistributedCache
     }
   }
 
+  private void appendBooleanArray(StringBuilder buffer, boolean[] data) {
+    if (data != null && data.length != 0) {
+      buffer.append(data[0]);
+      for(int i=1; i < data.length; i++) {
+        buffer.append(',');
+        buffer.append(data[i]);
+      }
+    }
+  }
+
+  private void appendLongArray(StringBuilder buffer, long[] data) {
+    if (data != null && data.length != 0) {
+      buffer.append(data[0]);
+      for(int i=1; i < data.length; i++) {
+        buffer.append(',');
+        buffer.append(data[i]);
+      }
+    }
+  }
+
   private void appendUriArray(StringBuilder buffer, URI[] data) {
     if (data != null && data.length != 0) {
       buffer.append(data[0]);
@@ -333,15 +368,15 @@ public class TestTrackerDistributedCache
     buf.append("\nArchives:");
     appendUriArray(buf, DistributedCache.getCacheArchives(conf1));
     buf.append("\nFile Visible:");
-    appendStringArray(buf, TrackerDistributedCacheManager.getFileVisibilities
+    appendBooleanArray(buf, TrackerDistributedCacheManager.getFileVisibilities
                       (conf1));
     buf.append("\nArchive Visible:");
-    appendStringArray(buf, TrackerDistributedCacheManager.getArchiveVisibilities
+    appendBooleanArray(buf, TrackerDistributedCacheManager.getArchiveVisibilities
                       (conf1));
     buf.append("\nFile timestamps:");
-    appendStringArray(buf, DistributedCache.getFileTimestamps(conf1));
+    appendLongArray(buf, DistributedCache.getFileTimestamps(conf1));
     buf.append("\nArchive timestamps:");
-    appendStringArray(buf, DistributedCache.getArchiveTimestamps(conf1));
+    appendLongArray(buf, DistributedCache.getArchiveTimestamps(conf1));
     LOG.info("state = " + buf.toString());
   }
   
@@ -367,10 +402,10 @@ public class TestTrackerDistributedCache
 
     // Task localizing for job
     TaskDistributedCacheManager handle = manager
-        .newTaskDistributedCacheManager(conf1);
-    handle.setup(localDirAllocator, workDir, TaskTracker
-          .getPrivateDistributedCacheDir(userName), 
-          TaskTracker.getPublicDistributedCacheDir());
+        .newTaskDistributedCacheManager(new JobID("jt", 1), conf1);
+    handle.setupCache(TaskTracker.getPublicDistributedCacheDir(),
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+    JobLocalizer.downloadPrivateCache(conf1);
     TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0);
     String distCacheDir;
     if (visibility) {
@@ -381,8 +416,7 @@ public class TestTrackerDistributedCache
     Path localizedPath =
       manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir,
           fs.getFileStatus(cacheFile), false,
-          c.timestamp, new Path(TEST_ROOT_DIR), false,
-          visibility);
+          c.timestamp, visibility);
     assertTrue("Cache file didn't get localized in the expected directory. " +
         "Expected localization to happen within " + 
         ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir +
@@ -494,20 +528,21 @@ public class TestTrackerDistributedCache
     conf2.set("user.name", userName);
 
     // We first test the size limit
-    Path firstLocalCache = manager.getLocalCache(firstCacheFile.toUri(), conf2, 
+    Path firstLocalCache = manager.getLocalCache(firstCacheFilePublic.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
-        fs.getFileStatus(firstCacheFile), false,
-        now, new Path(TEST_ROOT_DIR), false, false);
-    manager.releaseCache(firstCacheFile.toUri(), conf2, now, 
+        fs.getFileStatus(firstCacheFilePublic), false,
+        fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true);
+    manager.releaseCache(firstCacheFilePublic.toUri(), conf2, 
+        fs.getFileStatus(firstCacheFilePublic).getModificationTime(), 
         TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
     //in above code,localized a file of size 4K and then release the cache 
     // which will cause the cache be deleted when the limit goes out. 
     // The below code localize another cache which's designed to
     //sweep away the first cache.
-    Path secondLocalCache = manager.getLocalCache(secondCacheFile.toUri(), conf2, 
+    Path secondLocalCache = manager.getLocalCache(secondCacheFilePublic.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
-        fs.getFileStatus(secondCacheFile), false, 
-        System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+        fs.getFileStatus(secondCacheFilePublic), false, 
+        fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true);
     assertFalse("DistributedCache failed deleting old" + 
         " cache when the cache store is full.",
         localfs.exists(firstLocalCache));
@@ -530,7 +565,6 @@ public class TestTrackerDistributedCache
     System.err.println("That directory ends up with "
                        + localfs.listStatus(firstCursor).length
                        + " subdirectories");
-
     Path cachesBase = firstCursor;
 
     assertFalse
@@ -545,21 +579,23 @@ public class TestTrackerDistributedCache
     Path fourthCacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
     // Adding two more small files, so it triggers the number of sub directory
     // limit but does not trigger the file size limit.
-    createTempFile(thirdCacheFile, 1);
-    createTempFile(fourthCacheFile, 1);
+    createPublicTempFile(thirdCacheFile);
+    createPublicTempFile(fourthCacheFile);
     Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2,
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(thirdCacheFile), false,
-        now, new Path(TEST_ROOT_DIR), false, false);
+        fs.getFileStatus(thirdCacheFile).getModificationTime(), 
+        true);
     // Release the third cache so that it can be deleted while sweeping
-    manager.releaseCache(thirdCacheFile.toUri(), conf2, now, 
+    manager.releaseCache(thirdCacheFile.toUri(), conf2, 
+        fs.getFileStatus(thirdCacheFile).getModificationTime(), 
         TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
     // Getting the fourth cache will make the number of sub directories becomes
     // 3 which is greater than 2. So the released cache will be deleted.
     Path fourthLocalCache = manager.getLocalCache(fourthCacheFile.toUri(), conf2, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(fourthCacheFile), false, 
-        System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+        fs.getFileStatus(fourthCacheFile).getModificationTime(), true);
     assertFalse("DistributedCache failed deleting old" + 
         " cache when the cache exceeds the number of sub directories limit.",
         localfs.exists(thirdLocalCache));
@@ -590,7 +626,7 @@ public class TestTrackerDistributedCache
         TaskTracker.getPrivateDistributedCacheDir(userName),
         fs.getFileStatus(firstCacheFile), false,
         System.currentTimeMillis(),
-        new Path(TEST_ROOT_DIR), false, false);
+        false);
     assertNotNull("DistributedCache cached file on non-default filesystem.",
         result);
   }
@@ -625,6 +661,8 @@ public class TestTrackerDistributedCache
   protected void tearDown() throws IOException {
     new File(firstCacheFile.toString()).delete();
     new File(secondCacheFile.toString()).delete();
+    new File(firstCacheFilePublic.toString()).delete();
+    new File(secondCacheFilePublic.toString()).delete();
     FileUtil.fullyDelete(new File(TEST_ROOT_DIR));
   }
 
@@ -677,12 +715,14 @@ public class TestTrackerDistributedCache
 
     // ****** Imitate TaskRunner code.
     TaskDistributedCacheManager handle =
-      manager.newTaskDistributedCacheManager(subConf);
+      manager.newTaskDistributedCacheManager(new JobID("jt", 1), subConf);
     assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
     File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
-    handle.setup(localDirAllocator, workDir, TaskTracker
-        .getPrivateDistributedCacheDir(userName), 
-        TaskTracker.getPublicDistributedCacheDir());
+    handle.setupCache(TaskTracker.getPublicDistributedCacheDir(), 
+        TaskTracker.getPrivateDistributedCacheDir(userName));
+    //TODO this doesn't really happen in the TaskRunner
+//    handle.setupPrivateCache(localDirAllocator, TaskTracker
+//        .getPrivateDistributedCacheDir(userName));
     // ****** End of imitating TaskRunner code
 
     Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
@@ -702,8 +742,10 @@ public class TestTrackerDistributedCache
     // running a task of the same job
     Throwable th = null;
     try {
-      handle.setup(localDirAllocator, workDir, TaskTracker
-          .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir());
+      handle.setupCache(TaskTracker.getPublicDistributedCacheDir(),
+          TaskTracker.getPrivateDistributedCacheDir(userName));
+//      handle.setupPrivateCache(localDirAllocator, TaskTracker
+//          .getPrivateDistributedCacheDir(userName));
     } catch (IOException ie) {
       th = ie;
     }
@@ -721,9 +763,9 @@ public class TestTrackerDistributedCache
     TrackerDistributedCacheManager.determineCacheVisibilities(subConf2);
     
     handle =
-      manager.newTaskDistributedCacheManager(subConf2);
-    handle.setup(localDirAllocator, workDir, TaskTracker
-        .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir());
+      manager.newTaskDistributedCacheManager(new JobID("jt", 2), subConf2);
+    handle.setupCache(TaskTracker.getPublicDistributedCacheDir(), 
+        TaskTracker.getPrivateDistributedCacheDir(userName));
     Path[] localCacheFiles2 = DistributedCache.getLocalCacheFiles(subConf2);
     assertNotNull(null, localCacheFiles2);
     assertEquals(1, localCacheFiles2.length);
@@ -758,20 +800,20 @@ public class TestTrackerDistributedCache
     long now = System.currentTimeMillis();
 
     Path[] localCache = new Path[2];
-    localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf, 
+    localCache[0] = manager.getLocalCache(firstCacheFilePublic.toUri(), conf, 
         TaskTracker.getPrivateDistributedCacheDir(userName),
-        fs.getFileStatus(firstCacheFile), false,
-        now, new Path(TEST_ROOT_DIR), false, false);
+        fs.getFileStatus(firstCacheFilePublic), false,
+        fs.getFileStatus(firstCacheFilePublic).getModificationTime(), true);
     FsPermission myPermission = new FsPermission((short)0600);
     Path myFile = new Path(localCache[0].getParent(), "myfile.txt");
     if (FileSystem.create(localfs, myFile, myPermission) == null) {
       throw new IOException("Could not create " + myFile);
     }
     try {
-      localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf, 
+      localCache[1] = manager.getLocalCache(secondCacheFilePublic.toUri(), conf, 
           TaskTracker.getPrivateDistributedCacheDir(userName),
-          fs.getFileStatus(secondCacheFile), false, 
-          System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false);
+          fs.getFileStatus(secondCacheFilePublic), false, 
+          fs.getFileStatus(secondCacheFilePublic).getModificationTime(), true);
       FileStatus stat = localfs.getFileStatus(myFile);
       assertTrue(stat.getPermission().equals(myPermission));
       // validate permissions of localized files.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Fri Mar  4 04:43:33 2011
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.filecache.TestTrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -74,17 +75,16 @@ public class ClusterWithLinuxTaskControl
         + "/task-controller";
 
     @Override
-    public void setup() throws IOException {
+    public void setup(LocalDirAllocator allocator) throws IOException {
       // get the current ugi and set the task controller group owner
       getConf().set(TT_GROUP, taskTrackerSpecialGroup);
 
       // write configuration file
       configurationFile = createTaskControllerConf(System
           .getProperty(TASKCONTROLLER_PATH), getConf());
-      super.setup();
+      super.setup(allocator);
     }
 
-    @Override
     protected String getTaskControllerExecutablePath() {
       return taskControllerExePath;
     }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java Fri Mar  4 04:43:33 2011
@@ -74,7 +74,7 @@ public class TestJobTrackerSafeMode exte
    *   - check that after all the trackers are recovered, scheduling is opened 
    */
   private void testSafeMode(MiniDFSCluster dfs, MiniMRCluster mr) 
-  throws IOException {
+  throws IOException, InterruptedException {
     FileSystem fileSys = dfs.getFileSystem();
     JobConf jobConf = mr.createJobConf();
     String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
@@ -218,8 +218,9 @@ public class TestJobTrackerSafeMode exte
 
   /**
    * Test {@link JobTracker}'s safe mode.
+   * @throws InterruptedException 
    */
-  public void testJobTrackerSafeMode() throws IOException {
+  public void testJobTrackerSafeMode() throws IOException,InterruptedException{
     String namenode = null;
     MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
@@ -278,7 +279,8 @@ public class TestJobTrackerSafeMode exte
     }
   }
 
-  public static void main(String[] args) throws IOException {
+  public static void main(String[] args) 
+  throws IOException, InterruptedException {
     new TestJobTrackerSafeMode().testJobTrackerSafeMode();
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJvmManager.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJvmManager.java Fri Mar  4 04:43:33 2011
@@ -24,12 +24,19 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.util.Vector;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JvmManager.JvmManagerForType;
 import org.apache.hadoop.mapred.JvmManager.JvmManagerForType.JvmRunner;
+import org.apache.hadoop.mapred.TaskTracker.RunningJob;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.UserLogManager;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.After;
 import static org.junit.Assert.*;
 import org.junit.Before;
@@ -43,6 +50,8 @@ public class TestJvmManager {
   private TaskTracker tt;
   private JvmManager jvmManager;
   private JobConf ttConf;
+  private boolean threadCaughtException = false;
+  private String user;
 
   @Before
   public void setUp() {
@@ -55,16 +64,24 @@ public class TestJvmManager {
   }
 
   public TestJvmManager() throws Exception {
+    user = UserGroupInformation.getCurrentUser().getShortUserName();
     tt = new TaskTracker();
     ttConf = new JobConf();
     ttConf.setLong("mapred.tasktracker.tasks.sleeptime-before-sigkill", 2000);
     tt.setConf(ttConf);
     tt.setMaxMapSlots(MAP_SLOTS);
     tt.setMaxReduceSlots(REDUCE_SLOTS);
-    tt.setTaskController(new DefaultTaskController());
+    TaskController dtc;
+    tt.setTaskController((dtc = new DefaultTaskController()));
+    Configuration conf = new Configuration();
+    dtc.setConf(conf);
+    LocalDirAllocator ldirAlloc = new LocalDirAllocator("mapred.local.dir");
+    tt.getTaskController().setup(ldirAlloc);
+    JobID jobId = new JobID("test", 0);
     jvmManager = new JvmManager(tt);
     tt.setJvmManagerInstance(jvmManager);
     tt.setUserLogManager(new UserLogManager(ttConf));
+    tt.setCleanupThread(new InlineCleanupQueue());
   }
 
   // write a shell script to execute the command.
@@ -100,15 +117,21 @@ public class TestJvmManager {
     JobConf taskConf = new JobConf(ttConf);
     TaskAttemptID attemptID = new TaskAttemptID("test", 0, true, 0, 0);
     Task task = new MapTask(null, attemptID, 0, null, MAP_SLOTS);
+    task.setUser(user);
     task.setConf(taskConf);
     TaskInProgress tip = tt.new TaskInProgress(task, taskConf);
     File pidFile = new File(TEST_DIR, "pid");
-    final TaskRunner taskRunner = task.createRunner(tt, tip);
+    RunningJob rjob = new RunningJob(attemptID.getJobID());
+    TaskController taskController = new DefaultTaskController();
+    taskController.setConf(ttConf);
+    rjob.distCacheMgr = 
+      new TrackerDistributedCacheManager(ttConf, taskController).
+      newTaskDistributedCacheManager(attemptID.getJobID(), taskConf);
+    final TaskRunner taskRunner = task.createRunner(tt, tip, rjob);
     // launch a jvm which sleeps for 60 seconds
     final Vector<String> vargs = new Vector<String>(2);
     vargs.add(writeScript("SLEEP", "sleep 60\n", pidFile).getAbsolutePath());
     final File workDir = new File(TEST_DIR, "work");
-    workDir.mkdir();
     final File stdout = new File(TEST_DIR, "stdout");
     final File stderr = new File(TEST_DIR, "stderr");
 
@@ -117,10 +140,13 @@ public class TestJvmManager {
       public void run() {
         try {
           taskRunner.launchJvmAndWait(null, vargs, stdout, stderr, 100,
-              workDir, null);
+              workDir);
         } catch (InterruptedException e) {
           e.printStackTrace();
           return;
+        } catch (IOException e) {
+          e.printStackTrace();
+          setThreadCaughtException();
         }
       }
     };
@@ -148,7 +174,14 @@ public class TestJvmManager {
     final JvmRunner jvmRunner = mapJvmManager.jvmIdToRunner.get(jvmid);
     Thread killer = new Thread() {
       public void run() {
-        jvmRunner.kill();
+        try {
+          jvmRunner.kill();
+        } catch (IOException e) {
+          e.printStackTrace();
+          setThreadCaughtException();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
       }
     };
     killer.start();
@@ -164,21 +197,25 @@ public class TestJvmManager {
     // launch another jvm and see it finishes properly
     attemptID = new TaskAttemptID("test", 0, true, 0, 1);
     task = new MapTask(null, attemptID, 0, null, MAP_SLOTS);
+    task.setUser(user);
     task.setConf(taskConf);
     tip = tt.new TaskInProgress(task, taskConf);
-    TaskRunner taskRunner2 = task.createRunner(tt, tip);
+    TaskRunner taskRunner2 = task.createRunner(tt, tip, rjob);
     // build dummy vargs to call ls
     Vector<String> vargs2 = new Vector<String>(1);
     vargs2.add(writeScript("LS", "ls", pidFile).getAbsolutePath());
     File workDir2 = new File(TEST_DIR, "work2");
-    workDir.mkdir();
     File stdout2 = new File(TEST_DIR, "stdout2");
     File stderr2 = new File(TEST_DIR, "stderr2");
-    taskRunner2.launchJvmAndWait(null, vargs2, stdout2, stderr2, 100, workDir2,
-        null);
+    taskRunner2.launchJvmAndWait(null, vargs2, stdout2, stderr2, 100, workDir2);
     // join all the threads
     killer.join();
     jvmRunner.join();
     launcher.join();
+    assertFalse("Thread caught unexpected IOException", 
+                 threadCaughtException);
+  }
+  private void setThreadCaughtException() {
+    threadCaughtException = true;
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLinuxTaskController.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLinuxTaskController.java Fri Mar  4 04:43:33 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
 
 import junit.framework.TestCase;
 
@@ -42,11 +43,6 @@ public class TestLinuxTaskController ext
 
   public static class MyLinuxTaskController extends LinuxTaskController {
     String taskControllerExePath = taskControllerPath + "/task-controller";
-
-    @Override
-    protected String getTaskControllerExecutablePath() {
-      return taskControllerExePath;
-    }
   }
 
   private void validateTaskControllerSetup(TaskController controller,
@@ -55,7 +51,7 @@ public class TestLinuxTaskController ext
       // task controller setup should fail validating permissions.
       Throwable th = null;
       try {
-        controller.setup();
+        controller.setup(new LocalDirAllocator("mapred.local.dir"));
       } catch (IOException ie) {
         th = ie;
       }
@@ -64,7 +60,7 @@ public class TestLinuxTaskController ext
           + INVALID_TASKCONTROLLER_PERMISSIONS, th.getMessage().contains(
           "with exit code " + INVALID_TASKCONTROLLER_PERMISSIONS));
     } else {
-      controller.setup();
+      controller.setup(new LocalDirAllocator("mapred.local.dir"));
     }
 
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Fri Mar  4 04:43:33 2011
@@ -156,7 +156,6 @@ public class TestMiniMRWithDFS extends T
           .isDirectory());
       LOG.info("Verifying contents of mapred.local.dir "
           + localDir.getAbsolutePath());
-
       // Verify contents(user-dir) of tracker-sub-dir
       File trackerSubDir = new File(localDir, TaskTracker.SUBDIR);
       if (trackerSubDir.isDirectory()) {