You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:42:02 UTC

svn commit: r1077111 [3/3] - in /hadoop/common/branches/branch-0.20-security-patches: ./ conf/ src/c++/task-controller/ src/c++/task-controller/tests/ src/contrib/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/docs/src/documentation/c...

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 03:42:01 2011
@@ -67,6 +67,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
 import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
 import org.apache.hadoop.mapred.TaskLog.LogName;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
@@ -219,13 +220,19 @@ public class TaskTracker 
   //for serving map output to the other nodes
 
   static Random r = new Random();
-  private static final String SUBDIR = "taskTracker";
-  private static final String CACHEDIR = "archive";
-  private static final String JOBCACHE = "jobcache";
-  private static final String OUTPUT = "output";
+  static final String SUBDIR = "taskTracker";
+  private static final String DISTCACHEDIR = "distcache";
+  static final String JOBCACHE = "jobcache";
+  static final String OUTPUT = "output";
+  private static final String JARSDIR = "jars";
+  static final String LOCAL_SPLIT_FILE = "split.info";
+  static final String JOBFILE = "job.xml";
+
+  static final String JOB_LOCAL_DIR = "job.local.dir";
   static final String JOB_TOKEN_FILE="jobToken"; //localized file
-  private JobConf originalConf;
+
   private JobConf fConf;
+  private JobConf originalConf;
   private int maxMapSlots;
   private int maxReduceSlots;
   private int failures;
@@ -435,8 +442,8 @@ public class TaskTracker 
     return TaskTracker.SUBDIR + Path.SEPARATOR + user;
   } 
 
-  static String getCacheSubdir() {
-    return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
+  static String getDistributedCacheDir() {
+    return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
   }
 
   static String getJobCacheSubdir() {
@@ -449,31 +456,66 @@ public class TaskTracker 
   } 
 
   static String getLocalJobDir(String jobid) {
-	return getJobCacheSubdir() + Path.SEPARATOR + jobid; 
+    return getJobCacheSubdir() + Path.SEPARATOR + jobid;
   }
 
-  static String getLocalTaskDir(String jobid, String taskid) {
-	return getLocalTaskDir(jobid, taskid, false) ; 
+  static String getLocalJobConfFile(String jobid) {
+    return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
   }
 
-  static String getIntermediateOutputDir(String jobid, String taskid) {
-	return getLocalTaskDir(jobid, taskid) 
-           + Path.SEPARATOR + TaskTracker.OUTPUT ; 
+  static String getTaskConfFile(String jobid, String taskid,
+      boolean isCleanupAttempt) {
+    return getLocalTaskDir(jobid, taskid, isCleanupAttempt) + Path.SEPARATOR
+        + TaskTracker.JOBFILE;
   }
-  
-  static String getLocalJobTokenFile(String user, String jobid) {
-    return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
+
+  static String getJobJarsDir(String jobid) {
+    return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
+  }
+
+  static String getJobJarFile(String jobid) {
+    return getJobJarsDir(jobid) + Path.SEPARATOR + "job.jar";
+  }
+
+  static String getJobWorkDir(String jobid) {
+    return getLocalJobDir(jobid) + Path.SEPARATOR + MRConstants.WORKDIR;
   }
 
+  static String getLocalSplitFile(String jobid, String taskid) {
+    return TaskTracker.getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
+    + TaskTracker.LOCAL_SPLIT_FILE;
+  }
+
+  static String getIntermediateOutputDir(String jobid, String taskid) {
+    return getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
+        + TaskTracker.OUTPUT;
+  }
 
-  static String getLocalTaskDir(String jobid, 
-                                String taskid, 
-                                boolean isCleanupAttempt) {
-	String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
-	if (isCleanupAttempt) { 
+  static String getLocalTaskDir(String jobid, String taskid) {
+    return getLocalTaskDir(jobid, taskid, false);
+  }
+
+  static String getLocalTaskDir(String jobid, String taskid,
+      boolean isCleanupAttempt) {
+    String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+    if (isCleanupAttempt) {
       taskDir = taskDir + TASK_CLEANUP_SUFFIX;
-	}
-	return taskDir;
+    }
+    return taskDir;
+  }
+
+  static String getTaskWorkDir(String jobid, String taskid,
+      boolean isCleanupAttempt) {
+    String dir =
+      getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+    if (isCleanupAttempt) {
+      dir = dir + TASK_CLEANUP_SUFFIX;
+    }
+    return dir + Path.SEPARATOR + MRConstants.WORKDIR;
+  }
+
+  static String getLocalJobTokenFile(String user, String jobid) {
+    return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
   }
 
   private void setUgi(String user, Configuration conf) {
@@ -841,92 +883,25 @@ public class TaskTracker 
     Path localJarFile = null;
     Task t = tip.getTask();
     JobID jobId = t.getJobID();
-    Path jobFile = new Path(t.getJobFile());
-    String userName = t.getUser();
-    JobConf userConf = new JobConf(getJobConf());
-    setUgi(userName, userConf);
-    FileSystem userFs = jobFile.getFileSystem(userConf);
-    // Get sizes of JobFile and JarFile
-    // sizes are -1 if they are not present.
-    FileStatus status = null;
-    long jobFileSize = -1;
-    try {
-      status = userFs.getFileStatus(jobFile);
-      jobFileSize = status.getLen();
-    } catch(FileNotFoundException fe) {
-      jobFileSize = -1;
-    }
-    Path localJobFile = lDirAlloc.getLocalPathForWrite(
-                                    getLocalJobDir(jobId.toString())
-                                    + Path.SEPARATOR + "job.xml",
-                                    jobFileSize, fConf);
+
     RunningJob rjob = addTaskToJob(jobId, tip);
     synchronized (rjob) {
       if (!rjob.localized) {
-  
-        FileSystem localFs = FileSystem.getLocal(fConf);
-        // this will happen on a partial execution of localizeJob.
-        // Sometimes the job.xml gets copied but copying job.jar
-        // might throw out an exception
-        // we should clean up and then try again
-        Path jobDir = localJobFile.getParent();
-        if (localFs.exists(jobDir)){
-          localFs.delete(jobDir, true);
-          boolean b = localFs.mkdirs(jobDir);
-          if (!b)
-            throw new IOException("Not able to create job directory "
-                                  + jobDir.toString());
-        }
-        userFs.copyToLocalFile(jobFile, localJobFile);
-        JobConf localJobConf = new JobConf(localJobFile);
-        
-        // create the 'work' directory
-        // job-specific shared directory for use as scratch space 
-        Path workDir = lDirAlloc.getLocalPathForWrite(
-                         (getLocalJobDir(jobId.toString())
-                         + Path.SEPARATOR + MRConstants.WORKDIR), fConf);
-        if (!localFs.mkdirs(workDir)) {
-          throw new IOException("Mkdirs failed to create " 
-                      + workDir.toString());
-        }
-        System.setProperty("job.local.dir", workDir.toString());
-        localJobConf.set("job.local.dir", workDir.toString());
-        
-        // copy Jar file to the local FS and unjar it.
-        String jarFile = localJobConf.getJar();
-        long jarFileSize = -1;
-        if (jarFile != null) {
-          Path jarFilePath = new Path(jarFile);
-          try {
-            status = userFs.getFileStatus(jarFilePath);
-            jarFileSize = status.getLen();
-          } catch(FileNotFoundException fe) {
-            jarFileSize = -1;
-          }
-          // Here we check for and we check five times the size of jarFileSize
-          // to accommodate for unjarring the jar file in work directory 
-          localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
-                                     getLocalJobDir(jobId.toString())
-                                     + Path.SEPARATOR + "jars",
-                                     5 * jarFileSize, fConf), "job.jar");
-          if (!localFs.mkdirs(localJarFile.getParent())) {
-            throw new IOException("Mkdirs failed to create jars directory "); 
-          }
-          userFs.copyToLocalFile(jarFilePath, localJarFile);
-          localJobConf.setJar(localJarFile.toString());
-          OutputStream out = localFs.create(localJobFile);
-          try {
-            localJobConf.writeXml(out);
-          } finally {
-            out.close();
-          }
-          // also unjar the job.jar files 
-          RunJar.unJar(new File(localJarFile.toString()),
-                       new File(localJarFile.getParent().toString()));
-        }
+        JobConf localJobConf = localizeJobFiles(t);
+        
+        // Now initialize the job via task-controller so as to set
+        // ownership/permissions of jars, job-work-dir. Note that initializeJob
+        // should be the last call after every other directory/file to be
+        // directly under the job directory is created.
+        JobInitializationContext context = new JobInitializationContext();
+        context.jobid = jobId;
+        context.user = localJobConf.getUser();
+        context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR));
+        taskController.initializeJob(context);
+        
+        rjob.jobConf = localJobConf;  
         rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
                              localJobConf.getKeepFailedTaskFiles());
-        rjob.jobConf = localJobConf;
         // save local copy of JobToken file
         localizeJobTokenFile(t.getUser(), jobId, localJobConf);       
         FSDataInputStream in = localFs.open(new Path(
@@ -936,13 +911,319 @@ public class TaskTracker 
         getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
  
         rjob.localized = true;
-        taskController.initializeJob(jobId);
       }
     }
     launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
   }
 
-  private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
+  /**
+   * Localize the job on this tasktracker. Specifically
+   * <ul>
+   * <li>Cleanup and create job directories on all disks</li>
+   * <li>Download the job config file job.xml from the FS</li>
+   * <li>Create the job work directory and set {@link TaskTracker#JOB_LOCAL_DIR}
+   * in the configuration.
+   * <li>Download the job jar file job.jar from the FS, unjar it and set jar
+   * file in the configuration.</li>
+   * </ul>
+   *
+   * @param t task whose job has to be localized on this TT
+   * @return the modified job configuration to be used for all the tasks of this
+   *         job as a starting point.
+   * @throws IOException
+   */
+  JobConf localizeJobFiles(Task t)
+      throws IOException {
+    JobID jobId = t.getJobID();
+
+    Path jobFile = new Path(t.getJobFile());
+    String userName = t.getUser();
+    JobConf userConf = new JobConf(getJobConf());
+    setUgi(userName, userConf);
+    FileSystem userFs = jobFile.getFileSystem(userConf);
+
+    // Initialize the job directories first
+    FileSystem localFs = FileSystem.getLocal(fConf);
+    initializeJobDirs(jobId, localFs, fConf.getStrings("mapred.local.dir"));
+
+    // Download the job.xml for this job from the system FS
+    Path localJobFile =
+        localizeJobConfFile(new Path(t.getJobFile()), userFs, jobId);
+
+    JobConf localJobConf = new JobConf(localJobFile);
+
+    // create the 'job-work' directory: job-specific shared directory for use as
+    // scratch space by all tasks of the same job running on this TaskTracker.
+    Path workDir =
+        lDirAlloc.getLocalPathForWrite(getJobWorkDir(jobId.toString()),
+          fConf);
+    if (!localFs.mkdirs(workDir)) {
+      throw new IOException("Mkdirs failed to create "
+          + workDir.toString());
+    }
+    System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath());
+    localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
+
+    // Download the job.jar for this job from the system FS
+    localizeJobJarFile(jobId, userFs, localJobConf);
+
+    return localJobConf;
+  }
+
+  static class PermissionsHandler {
+    /**
+     * Permission information useful for setting permissions for a given path.
+     * Using this, one can set all possible combinations of permissions for the
+     * owner of the file. But permissions for the group and all others can only
+     * be set together, i.e. permissions for group cannot be set different from
+     * those for others and vice versa.
+     */
+    static class PermissionsInfo {
+      public boolean readPermissions;
+      public boolean writePermissions;
+      public boolean executablePermissions;
+      public boolean readPermsOwnerOnly;
+      public boolean writePermsOwnerOnly;
+      public boolean executePermsOwnerOnly;
+
+      /**
+       * Create a permissions-info object with the given attributes
+       *
+       * @param readPerms
+       * @param writePerms
+       * @param executePerms
+       * @param readOwnerOnly
+       * @param writeOwnerOnly
+       * @param executeOwnerOnly
+       */
+      public PermissionsInfo(boolean readPerms, boolean writePerms,
+          boolean executePerms, boolean readOwnerOnly, boolean writeOwnerOnly,
+          boolean executeOwnerOnly) {
+        readPermissions = readPerms;
+        writePermissions = writePerms;
+        executablePermissions = executePerms;
+        readPermsOwnerOnly = readOwnerOnly;
+        writePermsOwnerOnly = writeOwnerOnly;
+        executePermsOwnerOnly = executeOwnerOnly;
+      }
+    }
+
+    /**
+     * Set permission on the given file path using the specified permissions
+     * information. We use java api to set permission instead of spawning chmod
+     * processes. This saves a lot of time. Using this, one can set all possible
+     * combinations of permissions for the owner of the file. But permissions
+     * for the group and all others can only be set together, i.e. permissions
+     * for group cannot be set different from those for others and vice versa.
+     *
+     * This method should satisfy the needs of most of the applications. For
+     * those it doesn't, {@link FileUtil#chmod} can be used.
+     *
+     * @param f file path
+     * @param pInfo permissions information
+     * @return true if success, false otherwise
+     */
+    static boolean setPermissions(File f, PermissionsInfo pInfo) {
+      if (pInfo == null) {
+        LOG.debug(" PermissionsInfo is null, returning.");
+        return true;
+      }
+
+      LOG.debug("Setting permission for " + f.getAbsolutePath());
+
+      boolean ret = true;
+
+      // Clear all the flags
+      ret = f.setReadable(false, false) && ret;
+      ret = f.setWritable(false, false) && ret;
+      ret = f.setExecutable(false, false) && ret;
+
+      ret = f.setReadable(pInfo.readPermissions, pInfo.readPermsOwnerOnly);
+      LOG.debug("Readable status for " + f + " set to " + ret);
+      ret =
+        f.setWritable(pInfo.writePermissions, pInfo.writePermsOwnerOnly)
+        && ret;
+      LOG.debug("Writable status for " + f + " set to " + ret);
+      ret =
+        f.setExecutable(pInfo.executablePermissions,
+            pInfo.executePermsOwnerOnly)
+            && ret;
+
+      LOG.debug("Executable status for " + f + " set to " + ret);
+      return ret;
+    }
+
+    /**
+     * Permissions rwxr_xr_x
+     */
+    static PermissionsInfo sevenFiveFive =
+      new PermissionsInfo(true, true, true, false, true, false);
+    /**
+     * Completely private permissions
+     */
+    static PermissionsInfo sevenZeroZero =
+      new PermissionsInfo(true, true, true, true, true, true);
+  }
+
+  /**
+   * Prepare the job directories for a given job. To be called by the job
+   * localization code, only if the job is not already localized.
+   *
+   * <br>
+   * Here, we set 700 permissions on the job directories created on all disks.
+   * This we do so as to avoid any misuse by other users till the time
+   * {@link TaskController#initializeJob(JobInitializationContext)} is run at a
+   * later time to set proper private permissions on the job directories. <br>
+   *
+   * @param jobId
+   * @param fs
+   * @param localDirs
+   * @throws IOException
+   */
+  private static void initializeJobDirs(JobID jobId, FileSystem fs,
+      String[] localDirs)
+  throws IOException {
+    boolean initJobDirStatus = false;
+    String jobDirPath = getLocalJobDir(jobId.toString());
+    for (String localDir : localDirs) {
+      Path jobDir = new Path(localDir, jobDirPath);
+      if (fs.exists(jobDir)) {
+        // this will happen on a partial execution of localizeJob. Sometimes
+        // copying job.xml to the local disk succeeds but copying job.jar might
+        // throw out an exception. We should clean up and then try again.
+        fs.delete(jobDir, true);
+      }
+
+      boolean jobDirStatus = fs.mkdirs(jobDir);
+      if (!jobDirStatus) {
+        LOG.warn("Not able to create job directory " + jobDir.toString());
+      }
+
+      initJobDirStatus = initJobDirStatus || jobDirStatus;
+
+      // job-dir has to be private to the TT
+      PermissionsHandler.setPermissions(new File(jobDir.toUri().getPath()),
+          PermissionsHandler.sevenZeroZero);
+    }
+
+    if (!initJobDirStatus) {
+      throw new IOException("Not able to initialize job directories "
+          + "in any of the configured local directories for job "
+          + jobId.toString());
+    }
+  }
+
+  /**
+   * Download the job configuration file from the FS.
+   *
+   * @param t Task whose job file has to be downloaded
+   * @param jobId jobid of the task
+   * @return the local file system path of the downloaded file.
+   * @throws IOException
+   */
+  private Path localizeJobConfFile(Path jobFile, FileSystem userFs, JobID jobId)
+  throws IOException {
+    // Get sizes of JobFile and JarFile
+    // sizes are -1 if they are not present.
+    FileStatus status = null;
+    long jobFileSize = -1;
+    try {
+      status = userFs.getFileStatus(jobFile);
+      jobFileSize = status.getLen();
+    } catch(FileNotFoundException fe) {
+      jobFileSize = -1;
+    }
+    Path localJobFile =
+      lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(jobId.toString()),
+          jobFileSize, fConf);
+
+    // Download job.xml
+    userFs.copyToLocalFile(jobFile, localJobFile);
+    return localJobFile;
+  }
+
+  /**
+   * Download the job jar file from FS to the local file system and unjar it.
+   * Set the local jar file in the passed configuration.
+   *
+   * @param jobId
+   * @param userFs
+   * @param localJobConf
+   * @throws IOException
+   */
+  private void localizeJobJarFile(JobID jobId, FileSystem userFs,
+      JobConf localJobConf)
+  throws IOException {
+    // copy Jar file to the local FS and unjar it.
+    String jarFile = localJobConf.getJar();
+    FileStatus status = null;
+    long jarFileSize = -1;
+    if (jarFile != null) {
+      Path jarFilePath = new Path(jarFile);
+      try {
+        status = userFs.getFileStatus(jarFilePath);
+        jarFileSize = status.getLen();
+      } catch (FileNotFoundException fe) {
+        jarFileSize = -1;
+      }
+      // Here we check for and we check five times the size of jarFileSize
+      // to accommodate for unjarring the jar file in userfiles directory
+      Path localJarFile =
+        lDirAlloc.getLocalPathForWrite(getJobJarFile(jobId.toString()),
+            5 * jarFileSize, fConf);
+
+      //Download job.jar
+      userFs.copyToLocalFile(jarFilePath, localJarFile);
+
+      localJobConf.setJar(localJarFile.toString());
+
+      // Also un-jar the job.jar files. We un-jar it so that classes inside
+      // sub-directories, for e.g., lib/, classes/ are available on class-path
+      RunJar.unJar(new File(localJarFile.toString()), new File(localJarFile
+          .getParent().toString()));
+    }
+  }
+
+  /**
+   * Create taskDirs on all the disks. Otherwise, in some cases, like when
+   * LinuxTaskController is in use, child might wish to balance load across
+   * disks but cannot itself create attempt directory because of the fact that
+   * job directory is writable only by the TT.
+   *
+   * @param jobId
+   * @param attemptId
+   * @param isCleanupAttempt
+   * @param fs
+   * @param localDirs
+   * @throws IOException
+   */
+  private static void initializeAttemptDirs(String jobId, String attemptId,
+      boolean isCleanupAttempt, FileSystem fs, String[] localDirs)
+  throws IOException {
+
+    boolean initStatus = false;
+    String attemptDirPath =
+      getLocalTaskDir(jobId, attemptId, isCleanupAttempt);
+
+    for (String localDir : localDirs) {
+      Path localAttemptDir = new Path(localDir, attemptDirPath);
+
+      boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
+      if (!attemptDirStatus) {
+        LOG.warn("localAttemptDir " + localAttemptDir.toString()
+            + " couldn't be created.");
+      }
+      initStatus = initStatus || attemptDirStatus;
+    }
+
+    if (!initStatus) {
+      throw new IOException("Not able to initialize attempt directories "
+          + "in any of the configured local directories for the attempt "
+          + attemptId);
+    }
+  }
+  private void launchTaskForJob(TaskInProgress tip, JobConf jobConf)
+      throws IOException{
     synchronized (tip) {
       tip.setJobConf(jobConf);
       tip.launchTask();
@@ -1021,6 +1302,17 @@ public class TaskTracker 
   }
 
   /**
+   * For testing
+   */
+  TaskTracker() {
+    server = null;
+  }
+
+  void setConf(JobConf conf) {
+    fConf = conf;
+  }
+
+  /**
    * Start with the local machine name, and the default JobTracker
    */
   public TaskTracker(JobConf conf) throws IOException {
@@ -1061,13 +1353,6 @@ public class TaskTracker 
     initialize();
   }
 
-  /**
-   * Blank constructor. Only usable by tests.
-   */
-  TaskTracker() {
-    server = null;
-  }
-
   private void checkJettyPort(int port) throws IOException { 
     //See HADOOP-4744
     if (port < 0) {
@@ -1695,10 +1980,9 @@ public class TaskTracker 
       }
       
       MapOutputFile mapOutputFile = new MapOutputFile();
-      mapOutputFile.setJobId(taskId.getJobID());
       mapOutputFile.setConf(conf);
       
-      Path tmp_output =  mapOutputFile.getOutputFile(taskId);
+      Path tmp_output =  mapOutputFile.getOutputFile();
       if(tmp_output == null)
         return 0;
       FileSystem localFS = FileSystem.getLocal(conf);
@@ -1988,54 +2272,36 @@ public class TaskTracker 
       taskTimeout = (10 * 60 * 1000);
     }
         
-    private void localizeTask(Task task) throws IOException{
+    void localizeTask(Task task) throws IOException{
 
-      Path localTaskDir = 
-        lDirAlloc.getLocalPathForWrite(
-          TaskTracker.getLocalTaskDir(task.getJobID().toString(), 
-            task.getTaskID().toString(), task.isTaskCleanupTask()), 
-          defaultJobConf );
-      
       FileSystem localFs = FileSystem.getLocal(fConf);
-      if (!localFs.mkdirs(localTaskDir)) {
-        throw new IOException("Mkdirs failed to create " 
-                    + localTaskDir.toString());
-      }
 
-      // create symlink for ../work if it already doesnt exist
-      String workDir = lDirAlloc.getLocalPathToRead(
-                         TaskTracker.getLocalJobDir(task.getJobID().toString())
-                         + Path.SEPARATOR  
-                         + "work", defaultJobConf).toString();
-      String link = localTaskDir.getParent().toString() 
-                      + Path.SEPARATOR + "work";
-      File flink = new File(link);
-      if (!flink.exists())
-        FileUtil.symLink(workDir, link);
-      
+      // create taskDirs on all the disks.
+      initializeAttemptDirs(task.getJobID().toString(), task.getTaskID()
+          .toString(), task.isTaskCleanupTask(), localFs, fConf
+          .getStrings("mapred.local.dir"));
+
       // create the working-directory of the task 
-      Path cwd = lDirAlloc.getLocalPathForWrite(
-                   getLocalTaskDir(task.getJobID().toString(), 
-                      task.getTaskID().toString(), task.isTaskCleanupTask()) 
-                   + Path.SEPARATOR + MRConstants.WORKDIR,
-                   defaultJobConf);
+      Path cwd =
+          lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getJobID()
+              .toString(), task.getTaskID().toString(), task
+              .isTaskCleanupTask()), defaultJobConf);
       if (!localFs.mkdirs(cwd)) {
         throw new IOException("Mkdirs failed to create " 
                     + cwd.toString());
       }
 
-      Path localTaskFile = new Path(localTaskDir, "job.xml");
-      task.setJobFile(localTaskFile.toString());
       localJobConf.set("mapred.local.dir",
                        fConf.get("mapred.local.dir"));
+
       if (fConf.get("slave.host.name") != null) {
         localJobConf.set("slave.host.name",
                          fConf.get("slave.host.name"));
       }
             
-      localJobConf.set("mapred.task.id", task.getTaskID().toString());
       keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
 
+      // Do the task-type specific localization
       task.localizeConfiguration(localJobConf);
       
       List<String[]> staticResolutions = NetUtils.getAllStaticResolutions();
@@ -2071,12 +2337,6 @@ public class TaskTracker 
       if (isTaskMemoryManagerEnabled()) {
         localJobConf.setBoolean("task.memory.mgmt.enabled", true);
       }
-      OutputStream out = localFs.create(localTaskFile);
-      try {
-        localJobConf.writeXml(out);
-      } finally {
-        out.close();
-      }
       task.setConf(localJobConf);
     }
         
@@ -2349,7 +2609,7 @@ public class TaskTracker 
                                      localJobConf). toString());
               } catch (IOException e) {
                 LOG.warn("Working Directory of the task " + task.getTaskID() +
-                		 "doesnt exist. Caught exception " +
+                                " doesnt exist. Caught exception " +
                           StringUtils.stringifyException(e));
               }
               // Build the command  
@@ -2630,34 +2890,39 @@ public class TaskTracker 
           if (localJobConf == null) {
             return;
           }
-          String taskDir = getLocalTaskDir(task.getJobID().toString(),
-                             taskId.toString(), task.isTaskCleanupTask());
+          String localTaskDir =
+              getLocalTaskDir(task.getJobID().toString(), taskId.toString(),
+                  task.isTaskCleanupTask());
+          String taskWorkDir =
+              getTaskWorkDir(task.getJobID().toString(), taskId.toString(),
+                  task.isTaskCleanupTask());
           if (needCleanup) {
             if (runner != null) {
               //cleans up the output directory of the task (where map outputs 
               //and reduce inputs get stored)
               runner.close();
             }
-            //We don't delete the workdir
-            //since some other task (running in the same JVM) 
-            //might be using the dir. The JVM running the tasks would clean
-            //the workdir per a task in the task process itself.
+
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+              // No jvm reuse, remove everything
               directoryCleanupThread.addToQueue(localFs,
                   getLocalFiles(defaultJobConf,
-                  taskDir));
+                  localTaskDir));
             }  
-            
             else {
-              directoryCleanupThread.addToQueue(localFs,
-                  getLocalFiles(defaultJobConf,
-                taskDir+"/job.xml"));
+              // Jvm reuse. We don't delete the workdir since some other task
+              // (running in the same JVM) might be using the dir. The JVM
+              // running the tasks would clean the workdir per a task in the
+              // task process itself.
+              directoryCleanupThread.addToQueue(localFs, getLocalFiles(
+                  defaultJobConf, localTaskDir + Path.SEPARATOR
+                      + TaskTracker.JOBFILE));
             }
           } else {
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
               directoryCleanupThread.addToQueue(localFs,
                   getLocalFiles(defaultJobConf,
-                  taskDir+"/work"));
+                  taskWorkDir));
             }  
           }
         } catch (Throwable ie) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Fri Mar  4 03:42:01 2011
@@ -47,7 +47,7 @@ import junit.framework.TestCase;
  * <li>Make the built binary to setuid executable</li>
  * <li>Execute following targets:
  * <code>ant test -Dcompile.c++=true -Dtaskcontroller-path=<em>path to built binary</em> 
- * -Dtaskcontroller-user=<em>user,group</em></code></li>
+ * -Dtaskcontroller-ugi=<em>user,group</em></code></li>
  * </ol>
  * 
  */
@@ -82,6 +82,9 @@ public class ClusterWithLinuxTaskControl
 
   private static final int NUMBER_OF_NODES = 1;
 
+  static final String TASKCONTROLLER_PATH = "taskcontroller-path";
+  static final String TASKCONTROLLER_UGI = "taskcontroller-ugi";
+
   private File configurationFile = null;
 
   private UserGroupInformation taskControllerUser;
@@ -98,18 +101,20 @@ public class ClusterWithLinuxTaskControl
         MyLinuxTaskController.class.getName());
     mrCluster =
         new MiniMRCluster(NUMBER_OF_NODES, dfsCluster.getFileSystem().getUri()
-            .toString(), 1, null, null, conf);
+            .toString(), 4, null, null, conf);
 
     // Get the configured taskcontroller-path
-    String path = System.getProperty("taskcontroller-path");
-    createTaskControllerConf(path);
+    String path = System.getProperty(TASKCONTROLLER_PATH);
+    configurationFile =
+        createTaskControllerConf(path, mrCluster.getTaskTrackerRunner(0)
+            .getLocalDirs());
     String execPath = path + "/task-controller";
     TaskTracker tracker = mrCluster.getTaskTrackerRunner(0).tt;
     // TypeCasting the parent to our TaskController instance as we
     // know that that would be instance which should be present in TT.
     ((MyLinuxTaskController) tracker.getTaskController())
         .setTaskControllerExe(execPath);
-    String ugi = System.getProperty("taskcontroller-user");
+    String ugi = System.getProperty(TASKCONTROLLER_UGI);
     clusterConf = mrCluster.createJobConf();
     String[] splits = ugi.split(",");
     taskControllerUser = new UnixUserGroupInformation(splits);
@@ -140,21 +145,39 @@ public class ClusterWithLinuxTaskControl
         taskControllerUser.getGroupNames()[0]);
   }
 
-  private void createTaskControllerConf(String path)
+  /**
+   * Create taskcontroller.cfg.
+   * 
+   * @param path Path to the taskcontroller binary.
+   * @param localDirs
+   * @return the created conf file
+   * @throws IOException
+   */
+  static File createTaskControllerConf(String path, String[] localDirs)
       throws IOException {
     File confDirectory = new File(path, "../conf");
     if (!confDirectory.exists()) {
       confDirectory.mkdirs();
     }
-    configurationFile = new File(confDirectory, "taskcontroller.cfg");
+    File configurationFile = new File(confDirectory, "taskcontroller.cfg");
     PrintWriter writer =
         new PrintWriter(new FileOutputStream(configurationFile));
 
-    writer.println(String.format("mapred.local.dir=%s", mrCluster
-        .getTaskTrackerLocalDir(0)));
+    StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < localDirs.length; i++) {
+      sb.append(localDirs[i]);
+      if ((i + 1) != localDirs.length) {
+        sb.append(",");
+      }
+    }
+    writer.println(String.format("mapred.local.dir=%s", sb.toString()));
+
+    writer
+        .println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir()));
 
     writer.flush();
     writer.close();
+    return configurationFile;
   }
 
   /**
@@ -162,28 +185,35 @@ public class ClusterWithLinuxTaskControl
    * 
    * @return boolean
    */
-  protected boolean shouldRun() {
-    return isTaskExecPathPassed() && isUserPassed();
+  protected static boolean shouldRun() {
+    if (!isTaskExecPathPassed() || !isUserPassed()) {
+      LOG.info("Not running test.");
+      return false;
+    }
+    return true;
   }
 
-  private boolean isTaskExecPathPassed() {
-    String path = System.getProperty("taskcontroller-path");
+  private static boolean isTaskExecPathPassed() {
+    String path = System.getProperty(TASKCONTROLLER_PATH);
     if (path == null || path.isEmpty()
-        || path.equals("${taskcontroller-path}")) {
+        || path.equals("${" + TASKCONTROLLER_PATH + "}")) {
+      LOG.info("Invalid taskcontroller-path : " + path); 
       return false;
     }
     return true;
   }
 
-  private boolean isUserPassed() {
-    String ugi = System.getProperty("taskcontroller-user");
-    if (ugi != null && !(ugi.equals("${taskcontroller-user}"))
+  private static boolean isUserPassed() {
+    String ugi = System.getProperty(TASKCONTROLLER_UGI);
+    if (ugi != null && !(ugi.equals("${" + TASKCONTROLLER_UGI + "}"))
         && !ugi.isEmpty()) {
       if (ugi.indexOf(",") > 1) {
         return true;
       }
+      LOG.info("Invalid taskcontroller-ugi : " + ugi); 
       return false;
     }
+    LOG.info("Invalid taskcontroller-ugi : " + ugi);
     return false;
   }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Mar  4 03:42:01 2011
@@ -163,7 +163,7 @@ public class MiniMRCluster {
       StringBuffer localPath = new StringBuffer();
       for(int i=0; i < numDir; ++i) {
         File ttDir = new File(localDirBase, 
-                              Integer.toString(trackerId) + "_" + 0);
+                              Integer.toString(trackerId) + "_" + i);
         if (!ttDir.mkdirs()) {
           if (!ttDir.isDirectory()) {
             throw new IOException("Mkdirs failed to create " + ttDir);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestIsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestIsolationRunner.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestIsolationRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestIsolationRunner.java Fri Mar  4 03:42:01 2011
@@ -26,6 +26,7 @@ import junit.framework.TestCase;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
@@ -99,20 +100,17 @@ public class TestIsolationRunner extends
   
   private Path getAttemptJobXml(JobConf conf, JobID jobId, boolean isMap)
       throws IOException {
-    String[] localDirs = conf.getLocalDirs();
-    assertEquals(1, localDirs.length);
-    Path jobCacheDir = new Path(localDirs[0], "0_0" + Path.SEPARATOR +
-        "taskTracker" + Path.SEPARATOR + "jobcache" + Path.SEPARATOR + jobId);    
-    Path attemptDir = new Path(jobCacheDir,
-        new TaskAttemptID(new TaskID(jobId, isMap, 0), 0).toString());    
-    return new Path(attemptDir, "job.xml");
+    String taskid =
+        new TaskAttemptID(new TaskID(jobId, isMap, 0), 0).toString();
+    return new LocalDirAllocator("mapred.local.dir").getLocalPathToRead(
+        TaskTracker.getTaskConfFile(jobId.toString(), taskid, false), conf);
   }
 
   public void testIsolationRunOfMapTask() throws 
       IOException, InterruptedException, ClassNotFoundException {
     MiniMRCluster mr = null;
     try {
-      mr = new MiniMRCluster(1, "file:///", 1);
+      mr = new MiniMRCluster(1, "file:///", 4);
 
       // Run a job succesfully; keep task files.
       JobConf conf = mr.createJobConf();
@@ -131,7 +129,9 @@ public class TestIsolationRunner extends
       // run IsolationRunner against the map task.
       FileSystem localFs = FileSystem.getLocal(conf);
       Path mapJobXml =
-          getAttemptJobXml(conf, jobId, true).makeQualified(localFs);
+          getAttemptJobXml(
+              mr.getTaskTrackerRunner(0).getTaskTracker().getJobConf(),
+              jobId, true).makeQualified(localFs);
       assertTrue(localFs.exists(mapJobXml));
       
       new IsolationRunner().run(new String[] {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java Fri Mar  4 03:42:01 2011
@@ -20,14 +20,11 @@ package org.apache.hadoop.mapred;
 
 import java.io.IOException;
 
+import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.fs.FileSystem;
-import java.io.DataOutputStream;
-import java.io.IOException;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ToolRunner;
 
 /**
  * Test a java-based mapred job with LinuxTaskController running the jobs as a
@@ -43,42 +40,34 @@ public class TestJobExecutionAsDifferent
       return;
     }
     startCluster();
-    submitWordCount(getClusterConf());
-  }
-  
-  private void submitWordCount(JobConf clientConf) throws IOException {
-    Path inDir = new Path("testing/wc/input");
-    Path outDir = new Path("testing/wc/output");
-    JobConf conf = new JobConf(clientConf);
-    FileSystem fs = FileSystem.get(conf);
-    fs.delete(outDir, true);
-    if (!fs.mkdirs(inDir)) {
-      throw new IOException("Mkdirs failed to create " + inDir.toString());
-    }
+    Path inDir = new Path("input");
+    Path outDir = new Path("output");
+
+    RunningJob job;
 
-    DataOutputStream file = fs.create(new Path(inDir, "part-0"));
-    file.writeBytes("a b c d e f g h");
-    file.close();
-
-    conf.setJobName("wordcount");
-    conf.setInputFormat(TextInputFormat.class);
-
-    // the keys are words (strings)
-    conf.setOutputKeyClass(Text.class);
-    // the values are counts (ints)
-    conf.setOutputValueClass(IntWritable.class);
-
-    conf.setMapperClass(WordCount.MapClass.class);
-    conf.setCombinerClass(WordCount.Reduce.class);
-    conf.setReducerClass(WordCount.Reduce.class);
-
-    FileInputFormat.setInputPaths(conf, inDir);
-    FileOutputFormat.setOutputPath(conf, outDir);
-    conf.setNumMapTasks(1);
-    conf.setNumReduceTasks(1);
-    RunningJob rj = JobClient.runJob(conf);
-    assertTrue("Job Failed", rj.isSuccessful());
+    // Run a job with zero maps/reduces
+    job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 0, 0);
+    job.waitForCompletion();
+    assertTrue("Job failed", job.isSuccessful());
     assertOwnerShip(outDir);
+
+    // Run a job with 1 map and zero reduces
+    job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 0);
+    job.waitForCompletion();
+    assertTrue("Job failed", job.isSuccessful());
+    assertOwnerShip(outDir);
+
+    // Run a normal job with maps/reduces
+    job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 1);
+    job.waitForCompletion();
+    assertTrue("Job failed", job.isSuccessful());
+    assertOwnerShip(outDir);
+
+    // Run a job with jvm reuse
+    JobConf myConf = getClusterConf();
+    myConf.set("mapred.job.reuse.jvm.num.tasks", "-1");
+    String[] args = { "-m", "6", "-r", "3", "-mt", "1000", "-rt", "1000" };
+    assertEquals(0, ToolRunner.run(myConf, new SleepJob(), args));
   }
   
   public void testEnvironment() throws IOException {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java Fri Mar  4 03:42:01 2011
@@ -350,15 +350,26 @@ public class TestKillSubProcesses extend
     if (ProcessTree.isSetsidAvailable) {
       FileSystem fs = FileSystem.getLocal(conf);
 
-      if(fs.exists(scriptDir)){
+      if (fs.exists(scriptDir)) {
         fs.delete(scriptDir, true);
       }
-      // create shell script
-      Random rm = new Random();
+
+      // Create the directory and set open permissions so that the TT can
+      // access.
+      fs.mkdirs(scriptDir);
+      fs.setPermission(scriptDir, new FsPermission(FsAction.ALL, FsAction.ALL,
+          FsAction.ALL));
+
+     // create shell script
+     Random rm = new Random();
       Path scriptPath = new Path(scriptDirName, "_shellScript_" + rm.nextInt()
         + ".sh");
       String shellScript = scriptPath.toString();
+
+      // Construct the script. Set umask to 0000 so that TT can access all the
+      // files.
       String script =
+        "umask 000\n" + 
         "echo $$ > " + scriptDirName + "/childPidFile" + "$1\n" +
         "echo hello\n" +
         "trap 'echo got SIGTERM' 15 \n" +
@@ -373,7 +384,10 @@ public class TestKillSubProcesses extend
       file.writeBytes(script);
       file.close();
 
-      LOG.info("Calling script from map task of failjob : " + shellScript);
+      // Set executable permissions on the script.
+      new File(scriptPath.toUri().getPath()).setExecutable(true);
+
+      LOG.info("Calling script from map task : " + shellScript);
       Runtime.getRuntime()
           .exec(shellScript + " " + numLevelsOfSubProcesses);
     

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=1077111&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Fri Mar  4 03:42:01 2011
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
+import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+
+/**
+ * Test to verify localization of a job and localization of a task on a
+ * TaskTracker when {@link LinuxTaskController} is used.
+ * 
+ */
+public class TestLocalizationWithLinuxTaskController extends
+    TestTaskTrackerLocalization {
+
+  private static final Log LOG =
+      LogFactory.getLog(TestLocalizationWithLinuxTaskController.class);
+
+  private File configFile;
+  private MyLinuxTaskController taskController;
+
+  @Override
+  protected void setUp()
+      throws Exception {
+
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
+
+    super.setUp();
+
+    taskController = new MyLinuxTaskController();
+    String path =
+        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
+    configFile =
+        ClusterWithLinuxTaskController.createTaskControllerConf(path,
+            localDirs);
+    String execPath = path + "/task-controller";
+    taskController.setTaskControllerExe(execPath);
+    taskController.setConf(trackerFConf);
+    taskController.setup();
+  }
+
+  @Override
+  protected void tearDown()
+      throws Exception {
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
+    super.tearDown();
+    if (configFile != null) {
+      configFile.delete();
+    }
+  }
+
+  /** @InheritDoc */
+  @Override
+  public void testTaskControllerSetup() {
+    // Do nothing.
+  }
+
+  /**
+   * Test job localization with {@link LinuxTaskController}. Also check the
+   * permissions and file ownership of the job related files.
+   */
+  @Override
+  public void testJobLocalization()
+      throws IOException,
+      LoginException {
+
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
+
+    // Do job localization
+    JobConf localizedJobConf = tracker.localizeJobFiles(task);
+
+    String ugi =
+        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+    localizedJobConf.setUser(ugi.split(",")[0]);
+
+    // Now initialize the job via task-controller so as to set
+    // ownership/permissions of jars, job-work-dir
+    JobInitializationContext context = new JobInitializationContext();
+    context.jobid = jobId;
+    context.user = localizedJobConf.getUser();
+    context.workDir =
+        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+
+    // /////////// The method being tested
+    taskController.initializeJob(context);
+    // ///////////
+
+    UserGroupInformation taskTrackerugi =
+        UserGroupInformation.login(localizedJobConf);
+    for (String localDir : trackerFConf.getStrings("mapred.local.dir")) {
+      File jobDir =
+          new File(localDir, TaskTracker.getLocalJobDir(jobId.toString()));
+      // check the private permissions on the job directory
+      checkFilePermissions(jobDir.getAbsolutePath(), "dr-xrws---",
+          localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+    }
+
+    // check the private permissions of various directories
+    List<Path> dirs = new ArrayList<Path>();
+    Path jarsDir =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarsDir(jobId
+            .toString()), trackerFConf);
+    dirs.add(jarsDir);
+    dirs.add(new Path(jarsDir, "lib"));
+    for (Path dir : dirs) {
+      checkFilePermissions(dir.toUri().getPath(), "dr-xrws---",
+          localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+    }
+
+    // job-work dir needs user writable permissions
+    Path jobWorkDir =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getJobWorkDir(jobId
+            .toString()), trackerFConf);
+    checkFilePermissions(jobWorkDir.toUri().getPath(), "drwxrws---",
+        localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+
+    // check the private permissions of various files
+    List<Path> files = new ArrayList<Path>();
+    files.add(lDirAlloc.getLocalPathToRead(TaskTracker
+        .getLocalJobConfFile(jobId.toString()), trackerFConf));
+    files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId
+        .toString()), trackerFConf));
+    files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib1.jar"));
+    files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib2.jar"));
+    for (Path file : files) {
+      checkFilePermissions(file.toUri().getPath(), "-r-xrwx---",
+          localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+    }
+  }
+
+  /**
+   * Test task localization with {@link LinuxTaskController}. Also check the
+   * permissions and file ownership of task related files.
+   */
+  @Override
+  public void testTaskLocalization()
+      throws IOException,
+      LoginException {
+
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
+
+    JobConf localizedJobConf = tracker.localizeJobFiles(task);
+    String ugi =
+        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+    localizedJobConf.setUser(ugi.split(",")[0]);
+
+    // Now initialize the job via task-controller so as to set
+    // ownership/permissions of jars, job-work-dir
+    JobInitializationContext jobContext = new JobInitializationContext();
+    jobContext.jobid = jobId;
+    jobContext.user = localizedJobConf.getUser();
+    jobContext.workDir =
+        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+    taskController.initializeJob(jobContext);
+
+    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
+    tip.setJobConf(localizedJobConf);
+
+    // localize the task.
+    tip.localizeTask(task);
+    TaskRunner runner = task.createRunner(tracker, tip);
+    runner.setupChildTaskConfiguration(lDirAlloc);
+    Path workDir =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
+            .getJobID().toString(), task.getTaskID().toString(), task
+            .isTaskCleanupTask()), trackerFConf);
+    TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
+        localizedJobConf);
+    File[] logFiles = TaskRunner.prepareLogFiles(task.getTaskID());
+
+    // Initialize task
+    TaskControllerContext taskContext =
+        new TaskController.TaskControllerContext();
+    taskContext.env =
+        new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
+            .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
+    taskContext.task = task;
+    // /////////// The method being tested
+    taskController.initializeTask(taskContext);
+    // ///////////
+
+    // check the private permissions of various directories
+    List<Path> dirs = new ArrayList<Path>();
+    dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(jobId
+        .toString(), taskId.toString()), trackerFConf));
+    dirs.add(workDir);
+    dirs.add(new Path(workDir, "tmp"));
+    dirs.add(new Path(logFiles[1].getParentFile().getAbsolutePath()));
+    UserGroupInformation taskTrackerugi =
+        UserGroupInformation.login(localizedJobConf);
+    for (Path dir : dirs) {
+      checkFilePermissions(dir.toUri().getPath(), "drwxrws---",
+          localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+    }
+
+    // check the private permissions of various files
+    List<Path> files = new ArrayList<Path>();
+    files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
+        .getJobID().toString(), task.getTaskID().toString(), task
+        .isTaskCleanupTask()), trackerFConf));
+    for (Path file : files) {
+      checkFilePermissions(file.toUri().getPath(), "-rwxrwx---",
+          localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapRed.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapRed.java Fri Mar  4 03:42:01 2011
@@ -277,14 +277,12 @@ public class TestMapRed extends TestCase
   private static class MyReduce extends IdentityReducer {
     private JobConf conf;
     private boolean compressInput;
-    private TaskAttemptID taskId;
     private boolean first = true;
       
     @Override
     public void configure(JobConf conf) {
       this.conf = conf;
       compressInput = conf.getCompressMapOutput();
-      taskId = TaskAttemptID.forName(conf.get("mapred.task.id"));
     }
       
     public void reduce(WritableComparable key, Iterator values,
@@ -292,9 +290,9 @@ public class TestMapRed extends TestCase
                        ) throws IOException {
       if (first) {
         first = false;
-        MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID());
+        MapOutputFile mapOutputFile = new MapOutputFile();
         mapOutputFile.setConf(conf);
-        Path input = mapOutputFile.getInputFile(0, taskId);
+        Path input = mapOutputFile.getInputFile(0);
         FileSystem fs = FileSystem.get(conf);
         assertTrue("reduce input exists " + input, fs.exists(input));
         SequenceFile.Reader rdr = 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Fri Mar  4 03:42:01 2011
@@ -135,7 +135,7 @@ public class TestMiniMRWithDFS extends T
       int numNotDel = 0;
       File localDir = new File(mr.getTaskTrackerLocalDir(i));
       LOG.debug("Tracker directory: " + localDir);
-      File trackerDir = new File(localDir, "taskTracker");
+      File trackerDir = new File(localDir, TaskTracker.SUBDIR);
       assertTrue("local dir " + localDir + " does not exist.", 
                  localDir.isDirectory());
       assertTrue("task tracker dir " + trackerDir + " does not exist.", 
@@ -150,7 +150,7 @@ public class TestMiniMRWithDFS extends T
       }
       for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) {
         String name = contents[fileIdx];
-        if (!("taskTracker".equals(contents[fileIdx]))) {
+        if (!(TaskTracker.SUBDIR.equals(contents[fileIdx]))) {
           LOG.debug("Looking at " + name);
           assertTrue("Spurious directory " + name + " found in " +
                      localDir, false);
@@ -158,7 +158,7 @@ public class TestMiniMRWithDFS extends T
       }
       for (int idx = 0; idx < neededDirs.size(); ++idx) {
         String name = neededDirs.get(idx);
-        if (new File(new File(new File(trackerDir, "jobcache"),
+        if (new File(new File(new File(trackerDir, TaskTracker.JOBCACHE),
                               jobIds[idx]), name).isDirectory()) {
           found[idx] = true;
           numNotDel++;

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077111&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar  4 03:42:01 2011
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+
+import junit.framework.TestCase;
+
+/**
+ * Test to verify localization of a job and localization of a task on a
+ * TaskTracker.
+ * 
+ */
+public class TestTaskTrackerLocalization extends TestCase {
+
+  private File TEST_ROOT_DIR;
+  private File ROOT_MAPRED_LOCAL_DIR;
+  private File HADOOP_LOG_DIR;
+
+  private int numLocalDirs = 6;
+  private static final Log LOG =
+      LogFactory.getLog(TestTaskTrackerLocalization.class);
+
+  protected TaskTracker tracker;
+  protected JobConf trackerFConf;
+  protected JobID jobId;
+  protected TaskAttemptID taskId;
+  protected Task task;
+  protected String[] localDirs;
+  protected static LocalDirAllocator lDirAlloc =
+      new LocalDirAllocator("mapred.local.dir");
+
+  @Override
+  protected void setUp()
+      throws Exception {
+    TEST_ROOT_DIR =
+        new File(System.getProperty("test.build.data", "/tmp"),
+            "testTaskTrackerLocalization");
+    if (!TEST_ROOT_DIR.exists()) {
+      TEST_ROOT_DIR.mkdirs();
+    }
+
+    ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
+    ROOT_MAPRED_LOCAL_DIR.mkdirs();
+
+    HADOOP_LOG_DIR = new File(TEST_ROOT_DIR, "logs");
+    HADOOP_LOG_DIR.mkdir();
+    System.setProperty("hadoop.log.dir", HADOOP_LOG_DIR.getAbsolutePath());
+
+    trackerFConf = new JobConf();
+    trackerFConf.set("fs.default.name", "file:///");
+    localDirs = new String[numLocalDirs];
+    for (int i = 0; i < numLocalDirs; i++) {
+      localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath();
+    }
+    trackerFConf.setStrings("mapred.local.dir", localDirs);
+
+    // Create the job jar file
+    File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar");
+    JarOutputStream jstream =
+        new JarOutputStream(new FileOutputStream(jobJarFile));
+    ZipEntry ze = new ZipEntry("lib/lib1.jar");
+    jstream.putNextEntry(ze);
+    jstream.closeEntry();
+    ze = new ZipEntry("lib/lib2.jar");
+    jstream.putNextEntry(ze);
+    jstream.closeEntry();
+    jstream.finish();
+    jstream.close();
+    trackerFConf.setJar(jobJarFile.toURI().toString());
+
+    // Create the job configuration file
+    File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml");
+    FileOutputStream out = new FileOutputStream(jobConfFile);
+    trackerFConf.writeXml(out);
+    out.close();
+
+    // Set up the TaskTracker
+    tracker = new TaskTracker();
+    tracker.setConf(trackerFConf);
+    tracker.systemFS = FileSystem.getLocal(trackerFConf); // for test case
+
+    // Set up the task to be localized
+    String jtIdentifier = "200907202331";
+    jobId = new JobID(jtIdentifier, 1);
+    taskId =
+        new TaskAttemptID(jtIdentifier, jobId.getId(), true, 1, 0);
+    task =
+        new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
+
+    TaskController taskController = new DefaultTaskController();
+    taskController.setConf(trackerFConf);
+    taskController.setup();
+  }
+
+  @Override
+  protected void tearDown()
+      throws Exception {
+    FileUtil.fullyDelete(TEST_ROOT_DIR);
+  }
+
+  private static String[] getFilePermissionAttrs(String path)
+      throws IOException {
+    String output = Shell.execCommand("stat", path, "-c", "%A:%U:%G");
+    return output.split(":|\n");
+  }
+
+  static void checkFilePermissions(String path, String expectedPermissions,
+      String expectedOwnerUser, String expectedOwnerGroup)
+      throws IOException {
+    String[] attrs = getFilePermissionAttrs(path);
+    assertTrue("File attrs length is not 3 but " + attrs.length,
+        attrs.length == 3);
+    assertTrue("Path " + path + " has the permissions " + attrs[0]
+        + " instead of the expected " + expectedPermissions, attrs[0]
+        .equals(expectedPermissions));
+    assertTrue("Path " + path + " is not user owned not by "
+        + expectedOwnerUser + " but by " + attrs[1], attrs[1]
+        .equals(expectedOwnerUser));
+    assertTrue("Path " + path + " is not group owned not by "
+        + expectedOwnerGroup + " but by " + attrs[2], attrs[2]
+        .equals(expectedOwnerGroup));
+  }
+
+  /**
+   * Verify the task-controller's setup functionality
+   * 
+   * @throws IOException
+   * @throws LoginException
+   */
+  public void testTaskControllerSetup()
+      throws IOException,
+      LoginException {
+    // Task-controller is already set up in the test's setup method. Now verify.
+    UserGroupInformation ugi = UserGroupInformation.login(new JobConf());
+    for (String localDir : localDirs) {
+
+      // Verify the local-dir itself.
+      File lDir = new File(localDir);
+      assertTrue("localDir " + lDir + " doesn't exists!", lDir.exists());
+      checkFilePermissions(lDir.getAbsolutePath(), "drwxr-xr-x", ugi
+          .getUserName(), ugi.getGroupNames()[0]);
+
+      // Verify the distributed cache dir.
+      File distributedCacheDir =
+          new File(localDir, TaskTracker.getDistributedCacheDir());
+      assertTrue("distributed cache dir " + distributedCacheDir
+          + " doesn't exists!", distributedCacheDir.exists());
+      checkFilePermissions(distributedCacheDir.getAbsolutePath(),
+          "drwxr-xr-x", ugi.getUserName(), ugi.getGroupNames()[0]);
+
+      // Verify the job cache dir.
+      File jobCacheDir = new File(localDir, TaskTracker.getJobCacheSubdir());
+      assertTrue("jobCacheDir " + jobCacheDir + " doesn't exists!",
+          jobCacheDir.exists());
+      checkFilePermissions(jobCacheDir.getAbsolutePath(), "drwxr-xr-x", ugi
+          .getUserName(), ugi.getGroupNames()[0]);
+    }
+
+    // Verify the pemissions on the userlogs dir
+    File taskLog = TaskLog.getUserLogDir();
+    checkFilePermissions(taskLog.getAbsolutePath(), "drwxr-xr-x", ugi
+        .getUserName(), ugi.getGroupNames()[0]);
+  }
+
+  /**
+   * Test job localization on a TT. Tests localization of job.xml, job.jar and
+   * corresponding setting of configuration.
+   * 
+   * @throws IOException
+   * @throws LoginException
+   */
+  public void testJobLocalization()
+      throws IOException,
+      LoginException {
+
+    // /////////// The main method being tested
+    JobConf localizedJobConf = tracker.localizeJobFiles(task);
+    // ///////////
+
+    // Check the directory structure
+    for (String dir : localDirs) {
+
+      File localDir = new File(dir);
+      assertTrue("mapred.local.dir " + localDir + " isn'task created!",
+          localDir.exists());
+
+      File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
+      assertTrue("taskTracker sub-dir in the local-dir " + localDir
+          + "is not created!", taskTrackerSubDir.exists());
+
+      File jobCache = new File(taskTrackerSubDir, TaskTracker.JOBCACHE);
+      assertTrue("jobcache in the taskTrackerSubdir " + taskTrackerSubDir
+          + " isn'task created!", jobCache.exists());
+
+      File jobDir = new File(jobCache, jobId.toString());
+      assertTrue("job-dir in " + jobCache + " isn'task created!", jobDir
+          .exists());
+
+      // check the private permissions on the job directory
+      UserGroupInformation ugi = UserGroupInformation.login(localizedJobConf);
+      checkFilePermissions(jobDir.getAbsolutePath(), "drwx------", ugi
+          .getUserName(), ugi.getGroupNames()[0]);
+    }
+
+    // check the localization of job.xml
+    LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+
+    assertTrue("job.xml is not localized on this TaskTracker!!", lDirAlloc
+        .getLocalPathToRead(TaskTracker.getLocalJobConfFile(jobId.toString()),
+            trackerFConf) != null);
+
+    // check the localization of job.jar
+    Path jarFileLocalized =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId
+            .toString()), trackerFConf);
+    assertTrue("job.jar is not localized on this TaskTracker!!",
+        jarFileLocalized != null);
+    assertTrue("lib/lib1.jar is not unjarred on this TaskTracker!!", new File(
+        jarFileLocalized.getParent() + Path.SEPARATOR + "lib/lib1.jar")
+        .exists());
+    assertTrue("lib/lib2.jar is not unjarred on this TaskTracker!!", new File(
+        jarFileLocalized.getParent() + Path.SEPARATOR + "lib/lib2.jar")
+        .exists());
+
+    // check the creation of job work directory
+    assertTrue("job-work dir is not created on this TaskTracker!!", lDirAlloc
+        .getLocalPathToRead(TaskTracker.getJobWorkDir(jobId.toString()),
+            trackerFConf) != null);
+
+    // Check the setting of job.local.dir and job.jar which will eventually be
+    // used by the user's task
+    boolean jobLocalDirFlag = false, mapredJarFlag = false;
+    String localizedJobLocalDir =
+        localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR);
+    String localizedJobJar = localizedJobConf.getJar();
+    for (String localDir : localizedJobConf.getStrings("mapred.local.dir")) {
+      if (localizedJobLocalDir.equals(localDir + Path.SEPARATOR
+          + TaskTracker.getJobWorkDir(jobId.toString()))) {
+        jobLocalDirFlag = true;
+      }
+      if (localizedJobJar.equals(localDir + Path.SEPARATOR
+          + TaskTracker.getJobJarFile(jobId.toString()))) {
+        mapredJarFlag = true;
+      }
+    }
+    assertTrue(TaskTracker.JOB_LOCAL_DIR
+        + " is not set properly to the target users directory : "
+        + localizedJobLocalDir, jobLocalDirFlag);
+    assertTrue(
+        "mapred.jar is not set properly to the target users directory : "
+            + localizedJobJar, mapredJarFlag);
+  }
+
+  /**
+   * Test task localization on a TT.
+   * 
+   * @throws IOException
+   * @throws LoginException
+   */
+  public void testTaskLocalization()
+      throws IOException,
+      LoginException {
+
+    JobConf localizedJobConf = tracker.localizeJobFiles(task);
+
+    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
+    tip.setJobConf(localizedJobConf);
+
+    // ////////// The central method being tested
+    tip.localizeTask(task);
+    // //////////
+
+    // check the functionality of localizeTask
+    for (String dir : trackerFConf.getStrings("mapred.local.dir")) {
+      assertTrue("attempt-dir in localDir " + dir + " is not created!!",
+          new File(dir, TaskTracker.getLocalTaskDir(jobId.toString(), taskId
+              .toString())).exists());
+    }
+
+    Path workDir =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
+            .getJobID().toString(), task.getTaskID().toString(), task
+            .isTaskCleanupTask()), trackerFConf);
+    assertTrue("atttempt work dir for " + taskId.toString()
+        + " is not created in any of the configured dirs!!", workDir != null);
+
+    TaskRunner runner = task.createRunner(tracker, tip);
+
+    // /////// Few more methods being tested
+    runner.setupChildTaskConfiguration(lDirAlloc);
+    TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
+        localizedJobConf);
+    File[] logFiles = TaskRunner.prepareLogFiles(task.getTaskID());
+    // ///////
+
+    // Make sure the task-conf file is created
+    Path localTaskFile =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
+            .getJobID().toString(), task.getTaskID().toString(), task
+            .isTaskCleanupTask()), trackerFConf);
+    assertTrue("Task conf file " + localTaskFile.toString()
+        + " is not created!!", new File(localTaskFile.toUri().getPath())
+        .exists());
+
+    // /////// One more method being tested. This happens in child space.
+    JobConf localizedTaskConf = new JobConf(localTaskFile);
+    TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
+    // ///////
+
+    // Make sure that the mapred.local.dir is sandboxed
+    for (String childMapredLocalDir : localizedTaskConf
+        .getStrings("mapred.local.dir")) {
+      assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!",
+          childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(jobId
+              .toString(), taskId.toString(), false)));
+    }
+
+    // Make sure task task.getJobFile is changed and pointed correctly.
+    assertTrue(task.getJobFile().endsWith(
+        TaskTracker
+            .getTaskConfFile(jobId.toString(), taskId.toString(), false)));
+
+    // Make sure that the tmp directories are created
+    assertTrue("tmp dir is not created in workDir "
+        + workDir.toUri().getPath(),
+        new File(workDir.toUri().getPath(), "tmp").exists());
+
+    // Make sure that the log are setup properly
+    File logDir =
+        new File(HADOOP_LOG_DIR, TaskLog.USERLOGS_DIR_NAME + Path.SEPARATOR
+            + task.getTaskID().toString());
+    assertTrue("task's log dir " + logDir.toString() + " doesn't exist!",
+        logDir.exists());
+    UserGroupInformation ugi = UserGroupInformation.login(localizedJobConf);
+    checkFilePermissions(logDir.getAbsolutePath(), "drwx------", ugi
+        .getUserName(), ugi.getGroupNames()[0]);
+
+    File expectedStdout = new File(logDir, TaskLog.LogName.STDOUT.toString());
+    assertTrue("stdout log file is improper. Expected : "
+        + expectedStdout.toString() + " Observed : " + logFiles[0].toString(),
+        expectedStdout.toString().equals(logFiles[0].toString()));
+    File expectedStderr =
+        new File(logDir, Path.SEPARATOR + TaskLog.LogName.STDERR.toString());
+    assertTrue("stderr log file is improper. Expected : "
+        + expectedStderr.toString() + " Observed : " + logFiles[1].toString(),
+        expectedStderr.toString().equals(logFiles[1].toString()));
+  }
+}