You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:42:02 UTC
svn commit: r1077111 [3/3] - in
/hadoop/common/branches/branch-0.20-security-patches: ./ conf/
src/c++/task-controller/ src/c++/task-controller/tests/ src/contrib/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
src/docs/src/documentation/c...
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 03:42:01 2011
@@ -67,6 +67,7 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
import org.apache.hadoop.mapred.TaskLog.LogName;
import org.apache.hadoop.mapred.TaskStatus.Phase;
@@ -219,13 +220,19 @@ public class TaskTracker
//for serving map output to the other nodes
static Random r = new Random();
- private static final String SUBDIR = "taskTracker";
- private static final String CACHEDIR = "archive";
- private static final String JOBCACHE = "jobcache";
- private static final String OUTPUT = "output";
+ static final String SUBDIR = "taskTracker";
+ private static final String DISTCACHEDIR = "distcache";
+ static final String JOBCACHE = "jobcache";
+ static final String OUTPUT = "output";
+ private static final String JARSDIR = "jars";
+ static final String LOCAL_SPLIT_FILE = "split.info";
+ static final String JOBFILE = "job.xml";
+
+ static final String JOB_LOCAL_DIR = "job.local.dir";
static final String JOB_TOKEN_FILE="jobToken"; //localized file
- private JobConf originalConf;
+
private JobConf fConf;
+ private JobConf originalConf;
private int maxMapSlots;
private int maxReduceSlots;
private int failures;
@@ -435,8 +442,8 @@ public class TaskTracker
return TaskTracker.SUBDIR + Path.SEPARATOR + user;
}
- static String getCacheSubdir() {
- return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
+ static String getDistributedCacheDir() {
+ return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
}
static String getJobCacheSubdir() {
@@ -449,31 +456,66 @@ public class TaskTracker
}
static String getLocalJobDir(String jobid) {
- return getJobCacheSubdir() + Path.SEPARATOR + jobid;
+ return getJobCacheSubdir() + Path.SEPARATOR + jobid;
}
- static String getLocalTaskDir(String jobid, String taskid) {
- return getLocalTaskDir(jobid, taskid, false) ;
+ static String getLocalJobConfFile(String jobid) {
+ return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
}
- static String getIntermediateOutputDir(String jobid, String taskid) {
- return getLocalTaskDir(jobid, taskid)
- + Path.SEPARATOR + TaskTracker.OUTPUT ;
+ static String getTaskConfFile(String jobid, String taskid,
+ boolean isCleanupAttempt) {
+ return getLocalTaskDir(jobid, taskid, isCleanupAttempt) + Path.SEPARATOR
+ + TaskTracker.JOBFILE;
}
-
- static String getLocalJobTokenFile(String user, String jobid) {
- return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
+
+ static String getJobJarsDir(String jobid) {
+ return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
+ }
+
+ static String getJobJarFile(String jobid) {
+ return getJobJarsDir(jobid) + Path.SEPARATOR + "job.jar";
+ }
+
+ static String getJobWorkDir(String jobid) {
+ return getLocalJobDir(jobid) + Path.SEPARATOR + MRConstants.WORKDIR;
}
+ static String getLocalSplitFile(String jobid, String taskid) {
+ return TaskTracker.getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
+ + TaskTracker.LOCAL_SPLIT_FILE;
+ }
+
+ static String getIntermediateOutputDir(String jobid, String taskid) {
+ return getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
+ + TaskTracker.OUTPUT;
+ }
- static String getLocalTaskDir(String jobid,
- String taskid,
- boolean isCleanupAttempt) {
- String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
- if (isCleanupAttempt) {
+ static String getLocalTaskDir(String jobid, String taskid) {
+ return getLocalTaskDir(jobid, taskid, false);
+ }
+
+ static String getLocalTaskDir(String jobid, String taskid,
+ boolean isCleanupAttempt) {
+ String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+ if (isCleanupAttempt) {
taskDir = taskDir + TASK_CLEANUP_SUFFIX;
- }
- return taskDir;
+ }
+ return taskDir;
+ }
+
+ static String getTaskWorkDir(String jobid, String taskid,
+ boolean isCleanupAttempt) {
+ String dir =
+ getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+ if (isCleanupAttempt) {
+ dir = dir + TASK_CLEANUP_SUFFIX;
+ }
+ return dir + Path.SEPARATOR + MRConstants.WORKDIR;
+ }
+
+ static String getLocalJobTokenFile(String user, String jobid) {
+ return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
}
private void setUgi(String user, Configuration conf) {
@@ -841,92 +883,25 @@ public class TaskTracker
Path localJarFile = null;
Task t = tip.getTask();
JobID jobId = t.getJobID();
- Path jobFile = new Path(t.getJobFile());
- String userName = t.getUser();
- JobConf userConf = new JobConf(getJobConf());
- setUgi(userName, userConf);
- FileSystem userFs = jobFile.getFileSystem(userConf);
- // Get sizes of JobFile and JarFile
- // sizes are -1 if they are not present.
- FileStatus status = null;
- long jobFileSize = -1;
- try {
- status = userFs.getFileStatus(jobFile);
- jobFileSize = status.getLen();
- } catch(FileNotFoundException fe) {
- jobFileSize = -1;
- }
- Path localJobFile = lDirAlloc.getLocalPathForWrite(
- getLocalJobDir(jobId.toString())
- + Path.SEPARATOR + "job.xml",
- jobFileSize, fConf);
+
RunningJob rjob = addTaskToJob(jobId, tip);
synchronized (rjob) {
if (!rjob.localized) {
-
- FileSystem localFs = FileSystem.getLocal(fConf);
- // this will happen on a partial execution of localizeJob.
- // Sometimes the job.xml gets copied but copying job.jar
- // might throw out an exception
- // we should clean up and then try again
- Path jobDir = localJobFile.getParent();
- if (localFs.exists(jobDir)){
- localFs.delete(jobDir, true);
- boolean b = localFs.mkdirs(jobDir);
- if (!b)
- throw new IOException("Not able to create job directory "
- + jobDir.toString());
- }
- userFs.copyToLocalFile(jobFile, localJobFile);
- JobConf localJobConf = new JobConf(localJobFile);
-
- // create the 'work' directory
- // job-specific shared directory for use as scratch space
- Path workDir = lDirAlloc.getLocalPathForWrite(
- (getLocalJobDir(jobId.toString())
- + Path.SEPARATOR + MRConstants.WORKDIR), fConf);
- if (!localFs.mkdirs(workDir)) {
- throw new IOException("Mkdirs failed to create "
- + workDir.toString());
- }
- System.setProperty("job.local.dir", workDir.toString());
- localJobConf.set("job.local.dir", workDir.toString());
-
- // copy Jar file to the local FS and unjar it.
- String jarFile = localJobConf.getJar();
- long jarFileSize = -1;
- if (jarFile != null) {
- Path jarFilePath = new Path(jarFile);
- try {
- status = userFs.getFileStatus(jarFilePath);
- jarFileSize = status.getLen();
- } catch(FileNotFoundException fe) {
- jarFileSize = -1;
- }
- // Here we check for and we check five times the size of jarFileSize
- // to accommodate for unjarring the jar file in work directory
- localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
- getLocalJobDir(jobId.toString())
- + Path.SEPARATOR + "jars",
- 5 * jarFileSize, fConf), "job.jar");
- if (!localFs.mkdirs(localJarFile.getParent())) {
- throw new IOException("Mkdirs failed to create jars directory ");
- }
- userFs.copyToLocalFile(jarFilePath, localJarFile);
- localJobConf.setJar(localJarFile.toString());
- OutputStream out = localFs.create(localJobFile);
- try {
- localJobConf.writeXml(out);
- } finally {
- out.close();
- }
- // also unjar the job.jar files
- RunJar.unJar(new File(localJarFile.toString()),
- new File(localJarFile.getParent().toString()));
- }
+ JobConf localJobConf = localizeJobFiles(t);
+
+ // Now initialize the job via task-controller so as to set
+ // ownership/permissions of jars, job-work-dir. Note that initializeJob
+ // should be the last call after every other directory/file to be
+ // directly under the job directory is created.
+ JobInitializationContext context = new JobInitializationContext();
+ context.jobid = jobId;
+ context.user = localJobConf.getUser();
+ context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR));
+ taskController.initializeJob(context);
+
+ rjob.jobConf = localJobConf;
rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
localJobConf.getKeepFailedTaskFiles());
- rjob.jobConf = localJobConf;
// save local copy of JobToken file
localizeJobTokenFile(t.getUser(), jobId, localJobConf);
FSDataInputStream in = localFs.open(new Path(
@@ -936,13 +911,319 @@ public class TaskTracker
getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
rjob.localized = true;
- taskController.initializeJob(jobId);
}
}
launchTaskForJob(tip, new JobConf(rjob.jobConf));
}
- private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
+ /**
+ * Localize the job on this tasktracker. Specifically
+ * <ul>
+ * <li>Cleanup and create job directories on all disks</li>
+ * <li>Download the job config file job.xml from the FS</li>
+ * <li>Create the job work directory and set {@link TaskTracker#JOB_LOCAL_DIR}
+ * in the configuration.
+ * <li>Download the job jar file job.jar from the FS, unjar it and set jar
+ * file in the configuration.</li>
+ * </ul>
+ *
+ * @param t task whose job has to be localized on this TT
+ * @return the modified job configuration to be used for all the tasks of this
+ * job as a starting point.
+ * @throws IOException
+ */
+ JobConf localizeJobFiles(Task t)
+ throws IOException {
+ JobID jobId = t.getJobID();
+
+ Path jobFile = new Path(t.getJobFile());
+ String userName = t.getUser();
+ JobConf userConf = new JobConf(getJobConf());
+ setUgi(userName, userConf);
+ FileSystem userFs = jobFile.getFileSystem(userConf);
+
+ // Initialize the job directories first
+ FileSystem localFs = FileSystem.getLocal(fConf);
+ initializeJobDirs(jobId, localFs, fConf.getStrings("mapred.local.dir"));
+
+ // Download the job.xml for this job from the system FS
+ Path localJobFile =
+ localizeJobConfFile(new Path(t.getJobFile()), userFs, jobId);
+
+ JobConf localJobConf = new JobConf(localJobFile);
+
+ // create the 'job-work' directory: job-specific shared directory for use as
+ // scratch space by all tasks of the same job running on this TaskTracker.
+ Path workDir =
+ lDirAlloc.getLocalPathForWrite(getJobWorkDir(jobId.toString()),
+ fConf);
+ if (!localFs.mkdirs(workDir)) {
+ throw new IOException("Mkdirs failed to create "
+ + workDir.toString());
+ }
+ System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath());
+ localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
+
+ // Download the job.jar for this job from the system FS
+ localizeJobJarFile(jobId, userFs, localJobConf);
+
+ return localJobConf;
+ }
+
+ static class PermissionsHandler {
+ /**
+ * Permission information useful for setting permissions for a given path.
+ * Using this, one can set all possible combinations of permissions for the
+ * owner of the file. But permissions for the group and all others can only
+ * be set together, i.e. permissions for group cannot be set different from
+ * those for others and vice versa.
+ */
+ static class PermissionsInfo {
+ public boolean readPermissions;
+ public boolean writePermissions;
+ public boolean executablePermissions;
+ public boolean readPermsOwnerOnly;
+ public boolean writePermsOwnerOnly;
+ public boolean executePermsOwnerOnly;
+
+ /**
+ * Create a permissions-info object with the given attributes
+ *
+ * @param readPerms
+ * @param writePerms
+ * @param executePerms
+ * @param readOwnerOnly
+ * @param writeOwnerOnly
+ * @param executeOwnerOnly
+ */
+ public PermissionsInfo(boolean readPerms, boolean writePerms,
+ boolean executePerms, boolean readOwnerOnly, boolean writeOwnerOnly,
+ boolean executeOwnerOnly) {
+ readPermissions = readPerms;
+ writePermissions = writePerms;
+ executablePermissions = executePerms;
+ readPermsOwnerOnly = readOwnerOnly;
+ writePermsOwnerOnly = writeOwnerOnly;
+ executePermsOwnerOnly = executeOwnerOnly;
+ }
+ }
+
+ /**
+ * Set permission on the given file path using the specified permissions
+ * information. We use java api to set permission instead of spawning chmod
+ * processes. This saves a lot of time. Using this, one can set all possible
+ * combinations of permissions for the owner of the file. But permissions
+ * for the group and all others can only be set together, i.e. permissions
+ * for group cannot be set different from those for others and vice versa.
+ *
+ * This method should satisfy the needs of most of the applications. For
+ * those it doesn't, {@link FileUtil#chmod} can be used.
+ *
+ * @param f file path
+ * @param pInfo permissions information
+ * @return true if success, false otherwise
+ */
+ static boolean setPermissions(File f, PermissionsInfo pInfo) {
+ if (pInfo == null) {
+ LOG.debug(" PermissionsInfo is null, returning.");
+ return true;
+ }
+
+ LOG.debug("Setting permission for " + f.getAbsolutePath());
+
+ boolean ret = true;
+
+ // Clear all the flags
+ ret = f.setReadable(false, false) && ret;
+ ret = f.setWritable(false, false) && ret;
+ ret = f.setExecutable(false, false) && ret;
+
+ ret = f.setReadable(pInfo.readPermissions, pInfo.readPermsOwnerOnly);
+ LOG.debug("Readable status for " + f + " set to " + ret);
+ ret =
+ f.setWritable(pInfo.writePermissions, pInfo.writePermsOwnerOnly)
+ && ret;
+ LOG.debug("Writable status for " + f + " set to " + ret);
+ ret =
+ f.setExecutable(pInfo.executablePermissions,
+ pInfo.executePermsOwnerOnly)
+ && ret;
+
+ LOG.debug("Executable status for " + f + " set to " + ret);
+ return ret;
+ }
+
+ /**
+ * Permissions rwxr_xr_x
+ */
+ static PermissionsInfo sevenFiveFive =
+ new PermissionsInfo(true, true, true, false, true, false);
+ /**
+ * Completely private permissions
+ */
+ static PermissionsInfo sevenZeroZero =
+ new PermissionsInfo(true, true, true, true, true, true);
+ }
+
+ /**
+ * Prepare the job directories for a given job. To be called by the job
+ * localization code, only if the job is not already localized.
+ *
+ * <br>
+ * Here, we set 700 permissions on the job directories created on all disks.
+ * This we do so as to avoid any misuse by other users till the time
+ * {@link TaskController#initializeJob(JobInitializationContext)} is run at a
+ * later time to set proper private permissions on the job directories. <br>
+ *
+ * @param jobId
+ * @param fs
+ * @param localDirs
+ * @throws IOException
+ */
+ private static void initializeJobDirs(JobID jobId, FileSystem fs,
+ String[] localDirs)
+ throws IOException {
+ boolean initJobDirStatus = false;
+ String jobDirPath = getLocalJobDir(jobId.toString());
+ for (String localDir : localDirs) {
+ Path jobDir = new Path(localDir, jobDirPath);
+ if (fs.exists(jobDir)) {
+ // this will happen on a partial execution of localizeJob. Sometimes
+ // copying job.xml to the local disk succeeds but copying job.jar might
+ // throw out an exception. We should clean up and then try again.
+ fs.delete(jobDir, true);
+ }
+
+ boolean jobDirStatus = fs.mkdirs(jobDir);
+ if (!jobDirStatus) {
+ LOG.warn("Not able to create job directory " + jobDir.toString());
+ }
+
+ initJobDirStatus = initJobDirStatus || jobDirStatus;
+
+ // job-dir has to be private to the TT
+ PermissionsHandler.setPermissions(new File(jobDir.toUri().getPath()),
+ PermissionsHandler.sevenZeroZero);
+ }
+
+ if (!initJobDirStatus) {
+ throw new IOException("Not able to initialize job directories "
+ + "in any of the configured local directories for job "
+ + jobId.toString());
+ }
+ }
+
+ /**
+ * Download the job configuration file from the FS.
+ *
+ * @param t Task whose job file has to be downloaded
+ * @param jobId jobid of the task
+ * @return the local file system path of the downloaded file.
+ * @throws IOException
+ */
+ private Path localizeJobConfFile(Path jobFile, FileSystem userFs, JobID jobId)
+ throws IOException {
+ // Get sizes of JobFile and JarFile
+ // sizes are -1 if they are not present.
+ FileStatus status = null;
+ long jobFileSize = -1;
+ try {
+ status = userFs.getFileStatus(jobFile);
+ jobFileSize = status.getLen();
+ } catch(FileNotFoundException fe) {
+ jobFileSize = -1;
+ }
+ Path localJobFile =
+ lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(jobId.toString()),
+ jobFileSize, fConf);
+
+ // Download job.xml
+ userFs.copyToLocalFile(jobFile, localJobFile);
+ return localJobFile;
+ }
+
+ /**
+ * Download the job jar file from FS to the local file system and unjar it.
+ * Set the local jar file in the passed configuration.
+ *
+ * @param jobId
+ * @param userFs
+ * @param localJobConf
+ * @throws IOException
+ */
+ private void localizeJobJarFile(JobID jobId, FileSystem userFs,
+ JobConf localJobConf)
+ throws IOException {
+ // copy Jar file to the local FS and unjar it.
+ String jarFile = localJobConf.getJar();
+ FileStatus status = null;
+ long jarFileSize = -1;
+ if (jarFile != null) {
+ Path jarFilePath = new Path(jarFile);
+ try {
+ status = userFs.getFileStatus(jarFilePath);
+ jarFileSize = status.getLen();
+ } catch (FileNotFoundException fe) {
+ jarFileSize = -1;
+ }
+ // Here we check for and we check five times the size of jarFileSize
+ // to accommodate for unjarring the jar file in userfiles directory
+ Path localJarFile =
+ lDirAlloc.getLocalPathForWrite(getJobJarFile(jobId.toString()),
+ 5 * jarFileSize, fConf);
+
+ //Download job.jar
+ userFs.copyToLocalFile(jarFilePath, localJarFile);
+
+ localJobConf.setJar(localJarFile.toString());
+
+ // Also un-jar the job.jar files. We un-jar it so that classes inside
+ // sub-directories, for e.g., lib/, classes/ are available on class-path
+ RunJar.unJar(new File(localJarFile.toString()), new File(localJarFile
+ .getParent().toString()));
+ }
+ }
+
+ /**
+ * Create taskDirs on all the disks. Otherwise, in some cases, like when
+ * LinuxTaskController is in use, child might wish to balance load across
+ * disks but cannot itself create attempt directory because of the fact that
+ * job directory is writable only by the TT.
+ *
+ * @param jobId
+ * @param attemptId
+ * @param isCleanupAttempt
+ * @param fs
+ * @param localDirs
+ * @throws IOException
+ */
+ private static void initializeAttemptDirs(String jobId, String attemptId,
+ boolean isCleanupAttempt, FileSystem fs, String[] localDirs)
+ throws IOException {
+
+ boolean initStatus = false;
+ String attemptDirPath =
+ getLocalTaskDir(jobId, attemptId, isCleanupAttempt);
+
+ for (String localDir : localDirs) {
+ Path localAttemptDir = new Path(localDir, attemptDirPath);
+
+ boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
+ if (!attemptDirStatus) {
+ LOG.warn("localAttemptDir " + localAttemptDir.toString()
+ + " couldn't be created.");
+ }
+ initStatus = initStatus || attemptDirStatus;
+ }
+
+ if (!initStatus) {
+ throw new IOException("Not able to initialize attempt directories "
+ + "in any of the configured local directories for the attempt "
+ + attemptId);
+ }
+ }
+ private void launchTaskForJob(TaskInProgress tip, JobConf jobConf)
+ throws IOException{
synchronized (tip) {
tip.setJobConf(jobConf);
tip.launchTask();
@@ -1021,6 +1302,17 @@ public class TaskTracker
}
/**
+ * For testing
+ */
+ TaskTracker() {
+ server = null;
+ }
+
+ void setConf(JobConf conf) {
+ fConf = conf;
+ }
+
+ /**
* Start with the local machine name, and the default JobTracker
*/
public TaskTracker(JobConf conf) throws IOException {
@@ -1061,13 +1353,6 @@ public class TaskTracker
initialize();
}
- /**
- * Blank constructor. Only usable by tests.
- */
- TaskTracker() {
- server = null;
- }
-
private void checkJettyPort(int port) throws IOException {
//See HADOOP-4744
if (port < 0) {
@@ -1695,10 +1980,9 @@ public class TaskTracker
}
MapOutputFile mapOutputFile = new MapOutputFile();
- mapOutputFile.setJobId(taskId.getJobID());
mapOutputFile.setConf(conf);
- Path tmp_output = mapOutputFile.getOutputFile(taskId);
+ Path tmp_output = mapOutputFile.getOutputFile();
if(tmp_output == null)
return 0;
FileSystem localFS = FileSystem.getLocal(conf);
@@ -1988,54 +2272,36 @@ public class TaskTracker
taskTimeout = (10 * 60 * 1000);
}
- private void localizeTask(Task task) throws IOException{
+ void localizeTask(Task task) throws IOException{
- Path localTaskDir =
- lDirAlloc.getLocalPathForWrite(
- TaskTracker.getLocalTaskDir(task.getJobID().toString(),
- task.getTaskID().toString(), task.isTaskCleanupTask()),
- defaultJobConf );
-
FileSystem localFs = FileSystem.getLocal(fConf);
- if (!localFs.mkdirs(localTaskDir)) {
- throw new IOException("Mkdirs failed to create "
- + localTaskDir.toString());
- }
- // create symlink for ../work if it already doesnt exist
- String workDir = lDirAlloc.getLocalPathToRead(
- TaskTracker.getLocalJobDir(task.getJobID().toString())
- + Path.SEPARATOR
- + "work", defaultJobConf).toString();
- String link = localTaskDir.getParent().toString()
- + Path.SEPARATOR + "work";
- File flink = new File(link);
- if (!flink.exists())
- FileUtil.symLink(workDir, link);
-
+ // create taskDirs on all the disks.
+ initializeAttemptDirs(task.getJobID().toString(), task.getTaskID()
+ .toString(), task.isTaskCleanupTask(), localFs, fConf
+ .getStrings("mapred.local.dir"));
+
// create the working-directory of the task
- Path cwd = lDirAlloc.getLocalPathForWrite(
- getLocalTaskDir(task.getJobID().toString(),
- task.getTaskID().toString(), task.isTaskCleanupTask())
- + Path.SEPARATOR + MRConstants.WORKDIR,
- defaultJobConf);
+ Path cwd =
+ lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getJobID()
+ .toString(), task.getTaskID().toString(), task
+ .isTaskCleanupTask()), defaultJobConf);
if (!localFs.mkdirs(cwd)) {
throw new IOException("Mkdirs failed to create "
+ cwd.toString());
}
- Path localTaskFile = new Path(localTaskDir, "job.xml");
- task.setJobFile(localTaskFile.toString());
localJobConf.set("mapred.local.dir",
fConf.get("mapred.local.dir"));
+
if (fConf.get("slave.host.name") != null) {
localJobConf.set("slave.host.name",
fConf.get("slave.host.name"));
}
- localJobConf.set("mapred.task.id", task.getTaskID().toString());
keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+ // Do the task-type specific localization
task.localizeConfiguration(localJobConf);
List<String[]> staticResolutions = NetUtils.getAllStaticResolutions();
@@ -2071,12 +2337,6 @@ public class TaskTracker
if (isTaskMemoryManagerEnabled()) {
localJobConf.setBoolean("task.memory.mgmt.enabled", true);
}
- OutputStream out = localFs.create(localTaskFile);
- try {
- localJobConf.writeXml(out);
- } finally {
- out.close();
- }
task.setConf(localJobConf);
}
@@ -2349,7 +2609,7 @@ public class TaskTracker
localJobConf). toString());
} catch (IOException e) {
LOG.warn("Working Directory of the task " + task.getTaskID() +
- "doesnt exist. Caught exception " +
+ " doesnt exist. Caught exception " +
StringUtils.stringifyException(e));
}
// Build the command
@@ -2630,34 +2890,39 @@ public class TaskTracker
if (localJobConf == null) {
return;
}
- String taskDir = getLocalTaskDir(task.getJobID().toString(),
- taskId.toString(), task.isTaskCleanupTask());
+ String localTaskDir =
+ getLocalTaskDir(task.getJobID().toString(), taskId.toString(),
+ task.isTaskCleanupTask());
+ String taskWorkDir =
+ getTaskWorkDir(task.getJobID().toString(), taskId.toString(),
+ task.isTaskCleanupTask());
if (needCleanup) {
if (runner != null) {
//cleans up the output directory of the task (where map outputs
//and reduce inputs get stored)
runner.close();
}
- //We don't delete the workdir
- //since some other task (running in the same JVM)
- //might be using the dir. The JVM running the tasks would clean
- //the workdir per a task in the task process itself.
+
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+ // No jvm reuse, remove everything
directoryCleanupThread.addToQueue(localFs,
getLocalFiles(defaultJobConf,
- taskDir));
+ localTaskDir));
}
-
else {
- directoryCleanupThread.addToQueue(localFs,
- getLocalFiles(defaultJobConf,
- taskDir+"/job.xml"));
+ // Jvm reuse. We don't delete the workdir since some other task
+ // (running in the same JVM) might be using the dir. The JVM
+ // running the tasks would clean the workdir per a task in the
+ // task process itself.
+ directoryCleanupThread.addToQueue(localFs, getLocalFiles(
+ defaultJobConf, localTaskDir + Path.SEPARATOR
+ + TaskTracker.JOBFILE));
}
} else {
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
directoryCleanupThread.addToQueue(localFs,
getLocalFiles(defaultJobConf,
- taskDir+"/work"));
+ taskWorkDir));
}
}
} catch (Throwable ie) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Fri Mar 4 03:42:01 2011
@@ -47,7 +47,7 @@ import junit.framework.TestCase;
* <li>Make the built binary to setuid executable</li>
* <li>Execute following targets:
* <code>ant test -Dcompile.c++=true -Dtaskcontroller-path=<em>path to built binary</em>
- * -Dtaskcontroller-user=<em>user,group</em></code></li>
+ * -Dtaskcontroller-ugi=<em>user,group</em></code></li>
* </ol>
*
*/
@@ -82,6 +82,9 @@ public class ClusterWithLinuxTaskControl
private static final int NUMBER_OF_NODES = 1;
+ static final String TASKCONTROLLER_PATH = "taskcontroller-path";
+ static final String TASKCONTROLLER_UGI = "taskcontroller-ugi";
+
private File configurationFile = null;
private UserGroupInformation taskControllerUser;
@@ -98,18 +101,20 @@ public class ClusterWithLinuxTaskControl
MyLinuxTaskController.class.getName());
mrCluster =
new MiniMRCluster(NUMBER_OF_NODES, dfsCluster.getFileSystem().getUri()
- .toString(), 1, null, null, conf);
+ .toString(), 4, null, null, conf);
// Get the configured taskcontroller-path
- String path = System.getProperty("taskcontroller-path");
- createTaskControllerConf(path);
+ String path = System.getProperty(TASKCONTROLLER_PATH);
+ configurationFile =
+ createTaskControllerConf(path, mrCluster.getTaskTrackerRunner(0)
+ .getLocalDirs());
String execPath = path + "/task-controller";
TaskTracker tracker = mrCluster.getTaskTrackerRunner(0).tt;
// TypeCasting the parent to our TaskController instance as we
// know that that would be instance which should be present in TT.
((MyLinuxTaskController) tracker.getTaskController())
.setTaskControllerExe(execPath);
- String ugi = System.getProperty("taskcontroller-user");
+ String ugi = System.getProperty(TASKCONTROLLER_UGI);
clusterConf = mrCluster.createJobConf();
String[] splits = ugi.split(",");
taskControllerUser = new UnixUserGroupInformation(splits);
@@ -140,21 +145,39 @@ public class ClusterWithLinuxTaskControl
taskControllerUser.getGroupNames()[0]);
}
- private void createTaskControllerConf(String path)
+ /**
+ * Create taskcontroller.cfg.
+ *
+ * @param path Path to the taskcontroller binary.
+ * @param localDirs
+ * @return the created conf file
+ * @throws IOException
+ */
+ static File createTaskControllerConf(String path, String[] localDirs)
throws IOException {
File confDirectory = new File(path, "../conf");
if (!confDirectory.exists()) {
confDirectory.mkdirs();
}
- configurationFile = new File(confDirectory, "taskcontroller.cfg");
+ File configurationFile = new File(confDirectory, "taskcontroller.cfg");
PrintWriter writer =
new PrintWriter(new FileOutputStream(configurationFile));
- writer.println(String.format("mapred.local.dir=%s", mrCluster
- .getTaskTrackerLocalDir(0)));
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < localDirs.length; i++) {
+ sb.append(localDirs[i]);
+ if ((i + 1) != localDirs.length) {
+ sb.append(",");
+ }
+ }
+ writer.println(String.format("mapred.local.dir=%s", sb.toString()));
+
+ writer
+ .println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir()));
writer.flush();
writer.close();
+ return configurationFile;
}
/**
@@ -162,28 +185,35 @@ public class ClusterWithLinuxTaskControl
*
* @return boolean
*/
- protected boolean shouldRun() {
- return isTaskExecPathPassed() && isUserPassed();
+ protected static boolean shouldRun() {
+ if (!isTaskExecPathPassed() || !isUserPassed()) {
+ LOG.info("Not running test.");
+ return false;
+ }
+ return true;
}
- private boolean isTaskExecPathPassed() {
- String path = System.getProperty("taskcontroller-path");
+ private static boolean isTaskExecPathPassed() {
+ String path = System.getProperty(TASKCONTROLLER_PATH);
if (path == null || path.isEmpty()
- || path.equals("${taskcontroller-path}")) {
+ || path.equals("${" + TASKCONTROLLER_PATH + "}")) {
+ LOG.info("Invalid taskcontroller-path : " + path);
return false;
}
return true;
}
- private boolean isUserPassed() {
- String ugi = System.getProperty("taskcontroller-user");
- if (ugi != null && !(ugi.equals("${taskcontroller-user}"))
+ private static boolean isUserPassed() {
+ String ugi = System.getProperty(TASKCONTROLLER_UGI);
+ if (ugi != null && !(ugi.equals("${" + TASKCONTROLLER_UGI + "}"))
&& !ugi.isEmpty()) {
if (ugi.indexOf(",") > 1) {
return true;
}
+ LOG.info("Invalid taskcontroller-ugi : " + ugi);
return false;
}
+ LOG.info("Invalid taskcontroller-ugi : " + ugi);
return false;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Mar 4 03:42:01 2011
@@ -163,7 +163,7 @@ public class MiniMRCluster {
StringBuffer localPath = new StringBuffer();
for(int i=0; i < numDir; ++i) {
File ttDir = new File(localDirBase,
- Integer.toString(trackerId) + "_" + 0);
+ Integer.toString(trackerId) + "_" + i);
if (!ttDir.mkdirs()) {
if (!ttDir.isDirectory()) {
throw new IOException("Mkdirs failed to create " + ttDir);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestIsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestIsolationRunner.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestIsolationRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestIsolationRunner.java Fri Mar 4 03:42:01 2011
@@ -26,6 +26,7 @@ import junit.framework.TestCase;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
@@ -99,20 +100,17 @@ public class TestIsolationRunner extends
private Path getAttemptJobXml(JobConf conf, JobID jobId, boolean isMap)
throws IOException {
- String[] localDirs = conf.getLocalDirs();
- assertEquals(1, localDirs.length);
- Path jobCacheDir = new Path(localDirs[0], "0_0" + Path.SEPARATOR +
- "taskTracker" + Path.SEPARATOR + "jobcache" + Path.SEPARATOR + jobId);
- Path attemptDir = new Path(jobCacheDir,
- new TaskAttemptID(new TaskID(jobId, isMap, 0), 0).toString());
- return new Path(attemptDir, "job.xml");
+ String taskid =
+ new TaskAttemptID(new TaskID(jobId, isMap, 0), 0).toString();
+ return new LocalDirAllocator("mapred.local.dir").getLocalPathToRead(
+ TaskTracker.getTaskConfFile(jobId.toString(), taskid, false), conf);
}
public void testIsolationRunOfMapTask() throws
IOException, InterruptedException, ClassNotFoundException {
MiniMRCluster mr = null;
try {
- mr = new MiniMRCluster(1, "file:///", 1);
+ mr = new MiniMRCluster(1, "file:///", 4);
// Run a job succesfully; keep task files.
JobConf conf = mr.createJobConf();
@@ -131,7 +129,9 @@ public class TestIsolationRunner extends
// run IsolationRunner against the map task.
FileSystem localFs = FileSystem.getLocal(conf);
Path mapJobXml =
- getAttemptJobXml(conf, jobId, true).makeQualified(localFs);
+ getAttemptJobXml(
+ mr.getTaskTrackerRunner(0).getTaskTracker().getJobConf(),
+ jobId, true).makeQualified(localFs);
assertTrue(localFs.exists(mapJobXml));
new IsolationRunner().run(new String[] {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java Fri Mar 4 03:42:01 2011
@@ -20,14 +20,11 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
+import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.fs.FileSystem;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ToolRunner;
/**
* Test a java-based mapred job with LinuxTaskController running the jobs as a
@@ -43,42 +40,34 @@ public class TestJobExecutionAsDifferent
return;
}
startCluster();
- submitWordCount(getClusterConf());
- }
-
- private void submitWordCount(JobConf clientConf) throws IOException {
- Path inDir = new Path("testing/wc/input");
- Path outDir = new Path("testing/wc/output");
- JobConf conf = new JobConf(clientConf);
- FileSystem fs = FileSystem.get(conf);
- fs.delete(outDir, true);
- if (!fs.mkdirs(inDir)) {
- throw new IOException("Mkdirs failed to create " + inDir.toString());
- }
+ Path inDir = new Path("input");
+ Path outDir = new Path("output");
+
+ RunningJob job;
- DataOutputStream file = fs.create(new Path(inDir, "part-0"));
- file.writeBytes("a b c d e f g h");
- file.close();
-
- conf.setJobName("wordcount");
- conf.setInputFormat(TextInputFormat.class);
-
- // the keys are words (strings)
- conf.setOutputKeyClass(Text.class);
- // the values are counts (ints)
- conf.setOutputValueClass(IntWritable.class);
-
- conf.setMapperClass(WordCount.MapClass.class);
- conf.setCombinerClass(WordCount.Reduce.class);
- conf.setReducerClass(WordCount.Reduce.class);
-
- FileInputFormat.setInputPaths(conf, inDir);
- FileOutputFormat.setOutputPath(conf, outDir);
- conf.setNumMapTasks(1);
- conf.setNumReduceTasks(1);
- RunningJob rj = JobClient.runJob(conf);
- assertTrue("Job Failed", rj.isSuccessful());
+ // Run a job with zero maps/reduces
+ job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 0, 0);
+ job.waitForCompletion();
+ assertTrue("Job failed", job.isSuccessful());
assertOwnerShip(outDir);
+
+ // Run a job with 1 map and zero reduces
+ job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 0);
+ job.waitForCompletion();
+ assertTrue("Job failed", job.isSuccessful());
+ assertOwnerShip(outDir);
+
+ // Run a normal job with maps/reduces
+ job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 1);
+ job.waitForCompletion();
+ assertTrue("Job failed", job.isSuccessful());
+ assertOwnerShip(outDir);
+
+ // Run a job with jvm reuse
+ JobConf myConf = getClusterConf();
+ myConf.set("mapred.job.reuse.jvm.num.tasks", "-1");
+ String[] args = { "-m", "6", "-r", "3", "-mt", "1000", "-rt", "1000" };
+ assertEquals(0, ToolRunner.run(myConf, new SleepJob(), args));
}
public void testEnvironment() throws IOException {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java Fri Mar 4 03:42:01 2011
@@ -350,15 +350,26 @@ public class TestKillSubProcesses extend
if (ProcessTree.isSetsidAvailable) {
FileSystem fs = FileSystem.getLocal(conf);
- if(fs.exists(scriptDir)){
+ if (fs.exists(scriptDir)) {
fs.delete(scriptDir, true);
}
- // create shell script
- Random rm = new Random();
+
+ // Create the directory and set open permissions so that the TT can
+ // access.
+ fs.mkdirs(scriptDir);
+ fs.setPermission(scriptDir, new FsPermission(FsAction.ALL, FsAction.ALL,
+ FsAction.ALL));
+
+ // create shell script
+ Random rm = new Random();
Path scriptPath = new Path(scriptDirName, "_shellScript_" + rm.nextInt()
+ ".sh");
String shellScript = scriptPath.toString();
+
+ // Construct the script. Set umask to 0000 so that TT can access all the
+ // files.
String script =
+ "umask 000\n" +
"echo $$ > " + scriptDirName + "/childPidFile" + "$1\n" +
"echo hello\n" +
"trap 'echo got SIGTERM' 15 \n" +
@@ -373,7 +384,10 @@ public class TestKillSubProcesses extend
file.writeBytes(script);
file.close();
- LOG.info("Calling script from map task of failjob : " + shellScript);
+ // Set executable permissions on the script.
+ new File(scriptPath.toUri().getPath()).setExecutable(true);
+
+ LOG.info("Calling script from map task : " + shellScript);
Runtime.getRuntime()
.exec(shellScript + " " + numLevelsOfSubProcesses);
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=1077111&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Fri Mar 4 03:42:01 2011
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
+import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+
+/**
+ * Test to verify localization of a job and localization of a task on a
+ * TaskTracker when {@link LinuxTaskController} is used.
+ *
+ */
+public class TestLocalizationWithLinuxTaskController extends
+ TestTaskTrackerLocalization {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestLocalizationWithLinuxTaskController.class);
+
+ private File configFile;
+ private MyLinuxTaskController taskController;
+
+ @Override
+ protected void setUp()
+ throws Exception {
+
+ if (!ClusterWithLinuxTaskController.shouldRun()) {
+ return;
+ }
+
+ super.setUp();
+
+ taskController = new MyLinuxTaskController();
+ String path =
+ System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
+ configFile =
+ ClusterWithLinuxTaskController.createTaskControllerConf(path,
+ localDirs);
+ String execPath = path + "/task-controller";
+ taskController.setTaskControllerExe(execPath);
+ taskController.setConf(trackerFConf);
+ taskController.setup();
+ }
+
+ @Override
+ protected void tearDown()
+ throws Exception {
+ if (!ClusterWithLinuxTaskController.shouldRun()) {
+ return;
+ }
+ super.tearDown();
+ if (configFile != null) {
+ configFile.delete();
+ }
+ }
+
+ /** @InheritDoc */
+ @Override
+ public void testTaskControllerSetup() {
+ // Do nothing.
+ }
+
+ /**
+ * Test job localization with {@link LinuxTaskController}. Also check the
+ * permissions and file ownership of the job related files.
+ */
+ @Override
+ public void testJobLocalization()
+ throws IOException,
+ LoginException {
+
+ if (!ClusterWithLinuxTaskController.shouldRun()) {
+ return;
+ }
+
+ // Do job localization
+ JobConf localizedJobConf = tracker.localizeJobFiles(task);
+
+ String ugi =
+ System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+ localizedJobConf.setUser(ugi.split(",")[0]);
+
+ // Now initialize the job via task-controller so as to set
+ // ownership/permissions of jars, job-work-dir
+ JobInitializationContext context = new JobInitializationContext();
+ context.jobid = jobId;
+ context.user = localizedJobConf.getUser();
+ context.workDir =
+ new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+
+ // /////////// The method being tested
+ taskController.initializeJob(context);
+ // ///////////
+
+ UserGroupInformation taskTrackerugi =
+ UserGroupInformation.login(localizedJobConf);
+ for (String localDir : trackerFConf.getStrings("mapred.local.dir")) {
+ File jobDir =
+ new File(localDir, TaskTracker.getLocalJobDir(jobId.toString()));
+ // check the private permissions on the job directory
+ checkFilePermissions(jobDir.getAbsolutePath(), "dr-xrws---",
+ localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+ }
+
+ // check the private permissions of various directories
+ List<Path> dirs = new ArrayList<Path>();
+ Path jarsDir =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarsDir(jobId
+ .toString()), trackerFConf);
+ dirs.add(jarsDir);
+ dirs.add(new Path(jarsDir, "lib"));
+ for (Path dir : dirs) {
+ checkFilePermissions(dir.toUri().getPath(), "dr-xrws---",
+ localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+ }
+
+ // job-work dir needs user writable permissions
+ Path jobWorkDir =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getJobWorkDir(jobId
+ .toString()), trackerFConf);
+ checkFilePermissions(jobWorkDir.toUri().getPath(), "drwxrws---",
+ localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+
+ // check the private permissions of various files
+ List<Path> files = new ArrayList<Path>();
+ files.add(lDirAlloc.getLocalPathToRead(TaskTracker
+ .getLocalJobConfFile(jobId.toString()), trackerFConf));
+ files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId
+ .toString()), trackerFConf));
+ files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib1.jar"));
+ files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib2.jar"));
+ for (Path file : files) {
+ checkFilePermissions(file.toUri().getPath(), "-r-xrwx---",
+ localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+ }
+ }
+
+ /**
+ * Test task localization with {@link LinuxTaskController}. Also check the
+ * permissions and file ownership of task related files.
+ */
+ @Override
+ public void testTaskLocalization()
+ throws IOException,
+ LoginException {
+
+ if (!ClusterWithLinuxTaskController.shouldRun()) {
+ return;
+ }
+
+ JobConf localizedJobConf = tracker.localizeJobFiles(task);
+ String ugi =
+ System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+ localizedJobConf.setUser(ugi.split(",")[0]);
+
+ // Now initialize the job via task-controller so as to set
+ // ownership/permissions of jars, job-work-dir
+ JobInitializationContext jobContext = new JobInitializationContext();
+ jobContext.jobid = jobId;
+ jobContext.user = localizedJobConf.getUser();
+ jobContext.workDir =
+ new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+ taskController.initializeJob(jobContext);
+
+ TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
+ tip.setJobConf(localizedJobConf);
+
+ // localize the task.
+ tip.localizeTask(task);
+ TaskRunner runner = task.createRunner(tracker, tip);
+ runner.setupChildTaskConfiguration(lDirAlloc);
+ Path workDir =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
+ .getJobID().toString(), task.getTaskID().toString(), task
+ .isTaskCleanupTask()), trackerFConf);
+ TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
+ localizedJobConf);
+ File[] logFiles = TaskRunner.prepareLogFiles(task.getTaskID());
+
+ // Initialize task
+ TaskControllerContext taskContext =
+ new TaskController.TaskControllerContext();
+ taskContext.env =
+ new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
+ .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
+ taskContext.task = task;
+ // /////////// The method being tested
+ taskController.initializeTask(taskContext);
+ // ///////////
+
+ // check the private permissions of various directories
+ List<Path> dirs = new ArrayList<Path>();
+ dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(jobId
+ .toString(), taskId.toString()), trackerFConf));
+ dirs.add(workDir);
+ dirs.add(new Path(workDir, "tmp"));
+ dirs.add(new Path(logFiles[1].getParentFile().getAbsolutePath()));
+ UserGroupInformation taskTrackerugi =
+ UserGroupInformation.login(localizedJobConf);
+ for (Path dir : dirs) {
+ checkFilePermissions(dir.toUri().getPath(), "drwxrws---",
+ localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+ }
+
+ // check the private permissions of various files
+ List<Path> files = new ArrayList<Path>();
+ files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
+ .getJobID().toString(), task.getTaskID().toString(), task
+ .isTaskCleanupTask()), trackerFConf));
+ for (Path file : files) {
+ checkFilePermissions(file.toUri().getPath(), "-rwxrwx---",
+ localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapRed.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapRed.java Fri Mar 4 03:42:01 2011
@@ -277,14 +277,12 @@ public class TestMapRed extends TestCase
private static class MyReduce extends IdentityReducer {
private JobConf conf;
private boolean compressInput;
- private TaskAttemptID taskId;
private boolean first = true;
@Override
public void configure(JobConf conf) {
this.conf = conf;
compressInput = conf.getCompressMapOutput();
- taskId = TaskAttemptID.forName(conf.get("mapred.task.id"));
}
public void reduce(WritableComparable key, Iterator values,
@@ -292,9 +290,9 @@ public class TestMapRed extends TestCase
) throws IOException {
if (first) {
first = false;
- MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID());
+ MapOutputFile mapOutputFile = new MapOutputFile();
mapOutputFile.setConf(conf);
- Path input = mapOutputFile.getInputFile(0, taskId);
+ Path input = mapOutputFile.getInputFile(0);
FileSystem fs = FileSystem.get(conf);
assertTrue("reduce input exists " + input, fs.exists(input));
SequenceFile.Reader rdr =
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Fri Mar 4 03:42:01 2011
@@ -135,7 +135,7 @@ public class TestMiniMRWithDFS extends T
int numNotDel = 0;
File localDir = new File(mr.getTaskTrackerLocalDir(i));
LOG.debug("Tracker directory: " + localDir);
- File trackerDir = new File(localDir, "taskTracker");
+ File trackerDir = new File(localDir, TaskTracker.SUBDIR);
assertTrue("local dir " + localDir + " does not exist.",
localDir.isDirectory());
assertTrue("task tracker dir " + trackerDir + " does not exist.",
@@ -150,7 +150,7 @@ public class TestMiniMRWithDFS extends T
}
for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) {
String name = contents[fileIdx];
- if (!("taskTracker".equals(contents[fileIdx]))) {
+ if (!(TaskTracker.SUBDIR.equals(contents[fileIdx]))) {
LOG.debug("Looking at " + name);
assertTrue("Spurious directory " + name + " found in " +
localDir, false);
@@ -158,7 +158,7 @@ public class TestMiniMRWithDFS extends T
}
for (int idx = 0; idx < neededDirs.size(); ++idx) {
String name = neededDirs.get(idx);
- if (new File(new File(new File(trackerDir, "jobcache"),
+ if (new File(new File(new File(trackerDir, TaskTracker.JOBCACHE),
jobIds[idx]), name).isDirectory()) {
found[idx] = true;
numNotDel++;
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077111&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar 4 03:42:01 2011
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+
+import junit.framework.TestCase;
+
+/**
+ * Test to verify localization of a job and localization of a task on a
+ * TaskTracker.
+ *
+ */
+public class TestTaskTrackerLocalization extends TestCase {
+
+ private File TEST_ROOT_DIR;
+ private File ROOT_MAPRED_LOCAL_DIR;
+ private File HADOOP_LOG_DIR;
+
+ private int numLocalDirs = 6;
+ private static final Log LOG =
+ LogFactory.getLog(TestTaskTrackerLocalization.class);
+
+ protected TaskTracker tracker;
+ protected JobConf trackerFConf;
+ protected JobID jobId;
+ protected TaskAttemptID taskId;
+ protected Task task;
+ protected String[] localDirs;
+ protected static LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator("mapred.local.dir");
+
+ @Override
+ protected void setUp()
+ throws Exception {
+ TEST_ROOT_DIR =
+ new File(System.getProperty("test.build.data", "/tmp"),
+ "testTaskTrackerLocalization");
+ if (!TEST_ROOT_DIR.exists()) {
+ TEST_ROOT_DIR.mkdirs();
+ }
+
+ ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
+ ROOT_MAPRED_LOCAL_DIR.mkdirs();
+
+ HADOOP_LOG_DIR = new File(TEST_ROOT_DIR, "logs");
+ HADOOP_LOG_DIR.mkdir();
+ System.setProperty("hadoop.log.dir", HADOOP_LOG_DIR.getAbsolutePath());
+
+ trackerFConf = new JobConf();
+ trackerFConf.set("fs.default.name", "file:///");
+ localDirs = new String[numLocalDirs];
+ for (int i = 0; i < numLocalDirs; i++) {
+ localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath();
+ }
+ trackerFConf.setStrings("mapred.local.dir", localDirs);
+
+ // Create the job jar file
+ File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar");
+ JarOutputStream jstream =
+ new JarOutputStream(new FileOutputStream(jobJarFile));
+ ZipEntry ze = new ZipEntry("lib/lib1.jar");
+ jstream.putNextEntry(ze);
+ jstream.closeEntry();
+ ze = new ZipEntry("lib/lib2.jar");
+ jstream.putNextEntry(ze);
+ jstream.closeEntry();
+ jstream.finish();
+ jstream.close();
+ trackerFConf.setJar(jobJarFile.toURI().toString());
+
+ // Create the job configuration file
+ File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml");
+ FileOutputStream out = new FileOutputStream(jobConfFile);
+ trackerFConf.writeXml(out);
+ out.close();
+
+ // Set up the TaskTracker
+ tracker = new TaskTracker();
+ tracker.setConf(trackerFConf);
+ tracker.systemFS = FileSystem.getLocal(trackerFConf); // for test case
+
+ // Set up the task to be localized
+ String jtIdentifier = "200907202331";
+ jobId = new JobID(jtIdentifier, 1);
+ taskId =
+ new TaskAttemptID(jtIdentifier, jobId.getId(), true, 1, 0);
+ task =
+ new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
+
+ TaskController taskController = new DefaultTaskController();
+ taskController.setConf(trackerFConf);
+ taskController.setup();
+ }
+
+ @Override
+ protected void tearDown()
+ throws Exception {
+ FileUtil.fullyDelete(TEST_ROOT_DIR);
+ }
+
+ private static String[] getFilePermissionAttrs(String path)
+ throws IOException {
+ String output = Shell.execCommand("stat", path, "-c", "%A:%U:%G");
+ return output.split(":|\n");
+ }
+
+ static void checkFilePermissions(String path, String expectedPermissions,
+ String expectedOwnerUser, String expectedOwnerGroup)
+ throws IOException {
+ String[] attrs = getFilePermissionAttrs(path);
+ assertTrue("File attrs length is not 3 but " + attrs.length,
+ attrs.length == 3);
+ assertTrue("Path " + path + " has the permissions " + attrs[0]
+ + " instead of the expected " + expectedPermissions, attrs[0]
+ .equals(expectedPermissions));
+ assertTrue("Path " + path + " is not user owned not by "
+ + expectedOwnerUser + " but by " + attrs[1], attrs[1]
+ .equals(expectedOwnerUser));
+ assertTrue("Path " + path + " is not group owned not by "
+ + expectedOwnerGroup + " but by " + attrs[2], attrs[2]
+ .equals(expectedOwnerGroup));
+ }
+
+ /**
+ * Verify the task-controller's setup functionality
+ *
+ * @throws IOException
+ * @throws LoginException
+ */
+ public void testTaskControllerSetup()
+ throws IOException,
+ LoginException {
+ // Task-controller is already set up in the test's setup method. Now verify.
+ UserGroupInformation ugi = UserGroupInformation.login(new JobConf());
+ for (String localDir : localDirs) {
+
+ // Verify the local-dir itself.
+ File lDir = new File(localDir);
+ assertTrue("localDir " + lDir + " doesn't exists!", lDir.exists());
+ checkFilePermissions(lDir.getAbsolutePath(), "drwxr-xr-x", ugi
+ .getUserName(), ugi.getGroupNames()[0]);
+
+ // Verify the distributed cache dir.
+ File distributedCacheDir =
+ new File(localDir, TaskTracker.getDistributedCacheDir());
+ assertTrue("distributed cache dir " + distributedCacheDir
+ + " doesn't exists!", distributedCacheDir.exists());
+ checkFilePermissions(distributedCacheDir.getAbsolutePath(),
+ "drwxr-xr-x", ugi.getUserName(), ugi.getGroupNames()[0]);
+
+ // Verify the job cache dir.
+ File jobCacheDir = new File(localDir, TaskTracker.getJobCacheSubdir());
+ assertTrue("jobCacheDir " + jobCacheDir + " doesn't exists!",
+ jobCacheDir.exists());
+ checkFilePermissions(jobCacheDir.getAbsolutePath(), "drwxr-xr-x", ugi
+ .getUserName(), ugi.getGroupNames()[0]);
+ }
+
+ // Verify the pemissions on the userlogs dir
+ File taskLog = TaskLog.getUserLogDir();
+ checkFilePermissions(taskLog.getAbsolutePath(), "drwxr-xr-x", ugi
+ .getUserName(), ugi.getGroupNames()[0]);
+ }
+
+ /**
+ * Test job localization on a TT. Tests localization of job.xml, job.jar and
+ * corresponding setting of configuration.
+ *
+ * @throws IOException
+ * @throws LoginException
+ */
+ public void testJobLocalization()
+ throws IOException,
+ LoginException {
+
+ // /////////// The main method being tested
+ JobConf localizedJobConf = tracker.localizeJobFiles(task);
+ // ///////////
+
+ // Check the directory structure
+ for (String dir : localDirs) {
+
+ File localDir = new File(dir);
+ assertTrue("mapred.local.dir " + localDir + " isn'task created!",
+ localDir.exists());
+
+ File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
+ assertTrue("taskTracker sub-dir in the local-dir " + localDir
+ + "is not created!", taskTrackerSubDir.exists());
+
+ File jobCache = new File(taskTrackerSubDir, TaskTracker.JOBCACHE);
+ assertTrue("jobcache in the taskTrackerSubdir " + taskTrackerSubDir
+ + " isn'task created!", jobCache.exists());
+
+ File jobDir = new File(jobCache, jobId.toString());
+ assertTrue("job-dir in " + jobCache + " isn'task created!", jobDir
+ .exists());
+
+ // check the private permissions on the job directory
+ UserGroupInformation ugi = UserGroupInformation.login(localizedJobConf);
+ checkFilePermissions(jobDir.getAbsolutePath(), "drwx------", ugi
+ .getUserName(), ugi.getGroupNames()[0]);
+ }
+
+ // check the localization of job.xml
+ LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+
+ assertTrue("job.xml is not localized on this TaskTracker!!", lDirAlloc
+ .getLocalPathToRead(TaskTracker.getLocalJobConfFile(jobId.toString()),
+ trackerFConf) != null);
+
+ // check the localization of job.jar
+ Path jarFileLocalized =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId
+ .toString()), trackerFConf);
+ assertTrue("job.jar is not localized on this TaskTracker!!",
+ jarFileLocalized != null);
+ assertTrue("lib/lib1.jar is not unjarred on this TaskTracker!!", new File(
+ jarFileLocalized.getParent() + Path.SEPARATOR + "lib/lib1.jar")
+ .exists());
+ assertTrue("lib/lib2.jar is not unjarred on this TaskTracker!!", new File(
+ jarFileLocalized.getParent() + Path.SEPARATOR + "lib/lib2.jar")
+ .exists());
+
+ // check the creation of job work directory
+ assertTrue("job-work dir is not created on this TaskTracker!!", lDirAlloc
+ .getLocalPathToRead(TaskTracker.getJobWorkDir(jobId.toString()),
+ trackerFConf) != null);
+
+ // Check the setting of job.local.dir and job.jar which will eventually be
+ // used by the user's task
+ boolean jobLocalDirFlag = false, mapredJarFlag = false;
+ String localizedJobLocalDir =
+ localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR);
+ String localizedJobJar = localizedJobConf.getJar();
+ for (String localDir : localizedJobConf.getStrings("mapred.local.dir")) {
+ if (localizedJobLocalDir.equals(localDir + Path.SEPARATOR
+ + TaskTracker.getJobWorkDir(jobId.toString()))) {
+ jobLocalDirFlag = true;
+ }
+ if (localizedJobJar.equals(localDir + Path.SEPARATOR
+ + TaskTracker.getJobJarFile(jobId.toString()))) {
+ mapredJarFlag = true;
+ }
+ }
+ assertTrue(TaskTracker.JOB_LOCAL_DIR
+ + " is not set properly to the target users directory : "
+ + localizedJobLocalDir, jobLocalDirFlag);
+ assertTrue(
+ "mapred.jar is not set properly to the target users directory : "
+ + localizedJobJar, mapredJarFlag);
+ }
+
+ /**
+ * Test task localization on a TT.
+ *
+ * @throws IOException
+ * @throws LoginException
+ */
+ public void testTaskLocalization()
+ throws IOException,
+ LoginException {
+
+ JobConf localizedJobConf = tracker.localizeJobFiles(task);
+
+ TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
+ tip.setJobConf(localizedJobConf);
+
+ // ////////// The central method being tested
+ tip.localizeTask(task);
+ // //////////
+
+ // check the functionality of localizeTask
+ for (String dir : trackerFConf.getStrings("mapred.local.dir")) {
+ assertTrue("attempt-dir in localDir " + dir + " is not created!!",
+ new File(dir, TaskTracker.getLocalTaskDir(jobId.toString(), taskId
+ .toString())).exists());
+ }
+
+ Path workDir =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
+ .getJobID().toString(), task.getTaskID().toString(), task
+ .isTaskCleanupTask()), trackerFConf);
+ assertTrue("atttempt work dir for " + taskId.toString()
+ + " is not created in any of the configured dirs!!", workDir != null);
+
+ TaskRunner runner = task.createRunner(tracker, tip);
+
+ // /////// Few more methods being tested
+ runner.setupChildTaskConfiguration(lDirAlloc);
+ TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
+ localizedJobConf);
+ File[] logFiles = TaskRunner.prepareLogFiles(task.getTaskID());
+ // ///////
+
+ // Make sure the task-conf file is created
+ Path localTaskFile =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
+ .getJobID().toString(), task.getTaskID().toString(), task
+ .isTaskCleanupTask()), trackerFConf);
+ assertTrue("Task conf file " + localTaskFile.toString()
+ + " is not created!!", new File(localTaskFile.toUri().getPath())
+ .exists());
+
+ // /////// One more method being tested. This happens in child space.
+ JobConf localizedTaskConf = new JobConf(localTaskFile);
+ TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
+ // ///////
+
+ // Make sure that the mapred.local.dir is sandboxed
+ for (String childMapredLocalDir : localizedTaskConf
+ .getStrings("mapred.local.dir")) {
+ assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!",
+ childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(jobId
+ .toString(), taskId.toString(), false)));
+ }
+
+ // Make sure task task.getJobFile is changed and pointed correctly.
+ assertTrue(task.getJobFile().endsWith(
+ TaskTracker
+ .getTaskConfFile(jobId.toString(), taskId.toString(), false)));
+
+ // Make sure that the tmp directories are created
+ assertTrue("tmp dir is not created in workDir "
+ + workDir.toUri().getPath(),
+ new File(workDir.toUri().getPath(), "tmp").exists());
+
+ // Make sure that the log are setup properly
+ File logDir =
+ new File(HADOOP_LOG_DIR, TaskLog.USERLOGS_DIR_NAME + Path.SEPARATOR
+ + task.getTaskID().toString());
+ assertTrue("task's log dir " + logDir.toString() + " doesn't exist!",
+ logDir.exists());
+ UserGroupInformation ugi = UserGroupInformation.login(localizedJobConf);
+ checkFilePermissions(logDir.getAbsolutePath(), "drwx------", ugi
+ .getUserName(), ugi.getGroupNames()[0]);
+
+ File expectedStdout = new File(logDir, TaskLog.LogName.STDOUT.toString());
+ assertTrue("stdout log file is improper. Expected : "
+ + expectedStdout.toString() + " Observed : " + logFiles[0].toString(),
+ expectedStdout.toString().equals(logFiles[0].toString()));
+ File expectedStderr =
+ new File(logDir, Path.SEPARATOR + TaskLog.LogName.STDERR.toString());
+ assertTrue("stderr log file is improper. Expected : "
+ + expectedStderr.toString() + " Observed : " + logFiles[1].toString(),
+ expectedStderr.toString().equals(logFiles[1].toString()));
+ }
+}