You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:42:02 UTC
svn commit: r1077111 [2/3] - in
/hadoop/common/branches/branch-0.20-security-patches: ./ conf/
src/c++/task-controller/ src/c++/task-controller/tests/ src/contrib/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
src/docs/src/documentation/c...
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Mar 4 03:42:01 2011
@@ -24,12 +24,11 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -73,52 +72,27 @@ class LinuxTaskController extends TaskCo
new File(hadoopBin, "task-controller").getAbsolutePath();
}
- // The list of directory paths specified in the
- // variable mapred.local.dir. This is used to determine
- // which among the list of directories is picked up
- // for storing data for a particular task.
- private String[] mapredLocalDirs;
-
- // permissions to set on files and directories created.
- // When localized files are handled securely, this string
- // will change to something more restrictive. Until then,
- // it opens up the permissions for all, so that the tasktracker
- // and job owners can access files together.
- private static final String FILE_PERMISSIONS = "ugo+rwx";
-
- // permissions to set on components of the path leading to
- // localized files and directories. Read and execute permissions
- // are required for different users to be able to access the
- // files.
- private static final String PATH_PERMISSIONS = "go+rx";
-
public LinuxTaskController() {
super();
}
- @Override
- public void setConf(Configuration conf) {
- super.setConf(conf);
- mapredLocalDirs = conf.getStrings("mapred.local.dir");
- //Setting of the permissions of the local directory is done in
- //setup()
- }
-
/**
* List of commands that the setuid script will execute.
*/
enum TaskCommands {
+ INITIALIZE_JOB,
LAUNCH_TASK_JVM,
+ INITIALIZE_TASK,
TERMINATE_TASK_JVM,
- KILL_TASK_JVM
+ KILL_TASK_JVM,
}
-
+
/**
* Launch a task JVM that will run as the owner of the job.
*
- * This method launches a task JVM by executing a setuid
- * executable that will switch to the user and run the
- * task.
+ * This method launches a task JVM by executing a setuid executable that will
+ * switch to the user and run the task. Also does initialization of the first
+ * task in the same setuid process launch.
*/
@Override
void launchTaskJVM(TaskController.TaskControllerContext context)
@@ -150,48 +124,103 @@ class LinuxTaskController extends TaskCo
ShellCommandExecutor shExec = buildTaskControllerExecutor(
TaskCommands.LAUNCH_TASK_JVM,
env.conf.getUser(),
- launchTaskJVMArgs, env);
+ launchTaskJVMArgs, env.workDir, env.env);
context.shExec = shExec;
try {
shExec.execute();
} catch (Exception e) {
- LOG.warn("Exception thrown while launching task JVM : " +
- StringUtils.stringifyException(e));
- LOG.warn("Exit code from task is : " + shExec.getExitCode());
- LOG.warn("Output from task-contoller is : " + shExec.getOutput());
+ int exitCode = shExec.getExitCode();
+ LOG.warn("Exit code from task is : " + exitCode);
+ // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the task was
+ // terminated/killed forcefully. In all other cases, log the
+ // task-controller output
+ if (exitCode != 143 && exitCode != 137) {
+ LOG.warn("Exception thrown while launching task JVM : "
+ + StringUtils.stringifyException(e));
+ LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+ logOutput(shExec.getOutput());
+ }
throw new IOException(e);
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("output after executing task jvm = " + shExec.getOutput());
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+ logOutput(shExec.getOutput());
}
}
/**
- * Returns list of arguments to be passed while launching task VM.
- * See {@code buildTaskControllerExecutor(TaskCommands,
- * String, List<String>, JvmEnv)} documentation.
+ * Helper method that runs a LinuxTaskController command
+ *
+ * @param taskCommand
+ * @param user
+ * @param cmdArgs
+ * @param env
+ * @throws IOException
+ */
+ private void runCommand(TaskCommands taskCommand, String user,
+ List<String> cmdArgs, File workDir, Map<String, String> env)
+ throws IOException {
+
+ ShellCommandExecutor shExec =
+ buildTaskControllerExecutor(taskCommand, user, cmdArgs, workDir, env);
+ try {
+ shExec.execute();
+ } catch (Exception e) {
+ LOG.warn("Exit code from " + taskCommand.toString() + " is : "
+ + shExec.getExitCode());
+ LOG.warn("Exception thrown by " + taskCommand.toString() + " : "
+ + StringUtils.stringifyException(e));
+ LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
+ + " follows:");
+ logOutput(shExec.getOutput());
+ throw new IOException(e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
+ + " follows:");
+ logOutput(shExec.getOutput());
+ }
+ }
+
+ /**
+ * Returns list of arguments to be passed while initializing a new task. See
+ * {@code buildTaskControllerExecutor(TaskCommands, String, List<String>,
+ * JvmEnv)} documentation.
+ *
* @param context
* @return Argument to be used while launching Task VM
*/
- private List<String> buildLaunchTaskArgs(TaskControllerContext context) {
+ private List<String> buildInitializeTaskArgs(TaskControllerContext context) {
List<String> commandArgs = new ArrayList<String>(3);
String taskId = context.task.getTaskID().toString();
String jobId = getJobId(context);
- LOG.debug("getting the task directory as: "
- + getTaskCacheDirectory(context));
- commandArgs.add(getDirectoryChosenForTask(
- new File(getTaskCacheDirectory(context)),
- context));
commandArgs.add(jobId);
- if(!context.task.isTaskCleanupTask()) {
+ if (!context.task.isTaskCleanupTask()) {
commandArgs.add(taskId);
- }else {
+ } else {
commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
}
return commandArgs;
}
-
- // get the Job ID from the information in the TaskControllerContext
+
+ @Override
+ void initializeTask(TaskControllerContext context)
+ throws IOException {
+ LOG.debug("Going to do " + TaskCommands.INITIALIZE_TASK.toString()
+ + " for " + context.task.getTaskID().toString());
+ runCommand(TaskCommands.INITIALIZE_TASK, context.env.conf.getUser(),
+ buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
+ }
+
+ private void logOutput(String output) {
+ String shExecOutput = output;
+ if (shExecOutput != null) {
+ for (String str : shExecOutput.split("\n")) {
+ LOG.info(str);
+ }
+ }
+ }
+
private String getJobId(TaskControllerContext context) {
String taskId = context.task.getTaskID().toString();
TaskAttemptID tId = TaskAttemptID.forName(taskId);
@@ -199,6 +228,27 @@ class LinuxTaskController extends TaskCo
return jobId;
}
+ /**
+ * Returns list of arguments to be passed while launching task VM.
+ * See {@code buildTaskControllerExecutor(TaskCommands,
+ * String, List<String>, JvmEnv)} documentation.
+ * @param context
+ * @return Argument to be used while launching Task VM
+ */
+ private List<String> buildLaunchTaskArgs(TaskControllerContext context) {
+ List<String> commandArgs = new ArrayList<String>(3);
+ LOG.debug("getting the task directory as: "
+ + getTaskCacheDirectory(context));
+ LOG.debug("getting the tt_root as " +getDirectoryChosenForTask(
+ new File(getTaskCacheDirectory(context)),
+ context) );
+ commandArgs.add(getDirectoryChosenForTask(
+ new File(getTaskCacheDirectory(context)),
+ context));
+ commandArgs.addAll(buildInitializeTaskArgs(context));
+ return commandArgs;
+ }
+
// Get the directory from the list of directories configured
// in mapred.local.dir chosen for storing data pertaining to
// this task.
@@ -208,8 +258,8 @@ class LinuxTaskController extends TaskCo
String taskId = context.task.getTaskID().toString();
for (String dir : mapredLocalDirs) {
File mapredDir = new File(dir);
- File taskDir = new File(mapredDir, TaskTracker.getLocalTaskDir(
- jobId, taskId, context.task.isTaskCleanupTask()));
+ File taskDir = new File(mapredDir, TaskTracker.getTaskWorkDir(
+ jobId, taskId, context.task.isTaskCleanupTask())).getParentFile();
if (directory.equals(taskDir)) {
return dir;
}
@@ -219,68 +269,7 @@ class LinuxTaskController extends TaskCo
throw new IllegalArgumentException("invalid task cache directory "
+ directory.getAbsolutePath());
}
-
- /**
- * Setup appropriate permissions for directories and files that
- * are used by the task.
- *
- * As the LinuxTaskController launches tasks as a user, different
- * from the daemon, all directories and files that are potentially
- * used by the tasks are setup with appropriate permissions that
- * will allow access.
- *
- * Until secure data handling is implemented (see HADOOP-4491 and
- * HADOOP-4493, for e.g.), the permissions are set up to allow
- * read, write and execute access for everyone. This will be
- * changed to restricted access as data is handled securely.
- */
- void initializeTask(TaskControllerContext context) {
- // Setup permissions for the job and task cache directories.
- setupTaskCacheFileAccess(context);
- // setup permissions for task log directory
- setupTaskLogFileAccess(context);
- }
-
- // Allows access for the task to create log files under
- // the task log directory
- private void setupTaskLogFileAccess(TaskControllerContext context) {
- TaskAttemptID taskId = context.task.getTaskID();
- File f = TaskLog.getTaskLogFile(taskId, TaskLog.LogName.SYSLOG);
- String taskAttemptLogDir = f.getParentFile().getAbsolutePath();
- changeDirectoryPermissions(taskAttemptLogDir, FILE_PERMISSIONS, false);
- }
- // Allows access for the task to read, write and execute
- // the files under the job and task cache directories
- private void setupTaskCacheFileAccess(TaskControllerContext context) {
- String taskId = context.task.getTaskID().toString();
- JobID jobId = JobID.forName(getJobId(context));
- //Change permission for the task across all the disks
- for(String localDir : mapredLocalDirs) {
- File f = new File(localDir);
- File taskCacheDir = new File(f,TaskTracker.getLocalTaskDir(
- jobId.toString(), taskId, context.task.isTaskCleanupTask()));
- if(taskCacheDir.exists()) {
- changeDirectoryPermissions(taskCacheDir.getPath(),
- FILE_PERMISSIONS, true);
- }
- }//end of local directory Iteration
- }
-
- // convenience method to execute chmod.
- private void changeDirectoryPermissions(String dir, String mode,
- boolean isRecursive) {
- int ret = 0;
- try {
- ret = FileUtil.chmod(dir, mode, isRecursive);
- } catch (Exception e) {
- LOG.warn("Exception in changing permissions for directory " + dir +
- ". Exception: " + e.getMessage());
- }
- if (ret != 0) {
- LOG.warn("Could not change permissions for directory " + dir);
- }
- }
/**
* Builds the command line for launching/terminating/killing task JVM.
* Following is the format for launching/terminating/killing task JVM
@@ -295,14 +284,15 @@ class LinuxTaskController extends TaskCo
* @param command command to be executed.
* @param userName user name
* @param cmdArgs list of extra arguments
+ * @param workDir working directory for the task-controller
* @param env JVM environment variables.
* @return {@link ShellCommandExecutor}
* @throws IOException
*/
- private ShellCommandExecutor buildTaskControllerExecutor(TaskCommands command,
- String userName,
- List<String> cmdArgs, JvmEnv env)
- throws IOException {
+ private ShellCommandExecutor buildTaskControllerExecutor(
+ TaskCommands command, String userName, List<String> cmdArgs,
+ File workDir, Map<String, String> env)
+ throws IOException {
String[] taskControllerCmd = new String[3 + cmdArgs.size()];
taskControllerCmd[0] = getTaskControllerExecutablePath();
taskControllerCmd[1] = userName;
@@ -317,9 +307,9 @@ class LinuxTaskController extends TaskCo
}
}
ShellCommandExecutor shExec = null;
- if(env.workDir != null && env.workDir.exists()) {
+ if(workDir != null && workDir.exists()) {
shExec = new ShellCommandExecutor(taskControllerCmd,
- env.workDir, env.env);
+ workDir, env);
} else {
shExec = new ShellCommandExecutor(taskControllerCmd);
}
@@ -376,66 +366,20 @@ class LinuxTaskController extends TaskCo
return taskControllerExe;
}
- /**
- * Sets up the permissions of the following directories:
- *
- * Job cache directory
- * Archive directory
- * Hadoop log directories
- *
- */
- @Override
- void setup() {
- //set up job cache directory and associated permissions
- String localDirs[] = this.mapredLocalDirs;
- for(String localDir : localDirs) {
- //Cache root
- File cacheDirectory = new File(localDir,TaskTracker.getCacheSubdir());
- File jobCacheDirectory = new File(localDir,TaskTracker.getJobCacheSubdir());
- if(!cacheDirectory.exists()) {
- if(!cacheDirectory.mkdirs()) {
- LOG.warn("Unable to create cache directory : " +
- cacheDirectory.getPath());
- }
- }
- if(!jobCacheDirectory.exists()) {
- if(!jobCacheDirectory.mkdirs()) {
- LOG.warn("Unable to create job cache directory : " +
- jobCacheDirectory.getPath());
- }
- }
- //Give world writable permission for every directory under
- //mapred-local-dir.
- //Child tries to write files under it when executing.
- changeDirectoryPermissions(localDir, FILE_PERMISSIONS, true);
- }//end of local directory manipulations
- //setting up perms for user logs
- File taskLog = TaskLog.getUserLogDir();
- changeDirectoryPermissions(taskLog.getPath(), FILE_PERMISSIONS,false);
+ private List<String> buildInitializeJobCommandArgs(
+ JobInitializationContext context) {
+ List<String> initJobCmdArgs = new ArrayList<String>();
+ initJobCmdArgs.add(context.jobid.toString());
+ return initJobCmdArgs;
}
- /*
- * Create Job directories across disks and set their permissions to 777
- * This way when tasks are run we just need to setup permissions for
- * task folder.
- */
@Override
- void initializeJob(JobID jobid) {
- for(String localDir : this.mapredLocalDirs) {
- File jobDirectory = new File(localDir,
- TaskTracker.getLocalJobDir(jobid.toString()));
- if(!jobDirectory.exists()) {
- if(!jobDirectory.mkdir()) {
- LOG.warn("Unable to create job cache directory : "
- + jobDirectory.getPath());
- continue;
- }
- }
- //Should be recursive because the jar and work folders might be
- //present under the job cache directory
- changeDirectoryPermissions(
- jobDirectory.getPath(), FILE_PERMISSIONS, true);
- }
+ void initializeJob(JobInitializationContext context)
+ throws IOException {
+ LOG.debug("Going to initialize job " + context.jobid.toString()
+ + " on the TT");
+ runCommand(TaskCommands.INITIALIZE_JOB, context.user,
+ buildInitializeJobCommandArgs(context), context.workDir, null);
}
/**
@@ -470,7 +414,7 @@ class LinuxTaskController extends TaskCo
}
ShellCommandExecutor shExec = buildTaskControllerExecutor(
command, context.env.conf.getUser(),
- buildKillTaskCommandArgs(context), context.env);
+ buildKillTaskCommandArgs(context), context.env.workDir, context.env.env);
try {
shExec.execute();
} catch (Exception e) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar 4 03:42:01 2011
@@ -21,12 +21,17 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapred.JobTrackerMetricsInst;
import org.apache.hadoop.mapred.JvmTask;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
@@ -62,7 +67,7 @@ class LocalJobRunner implements JobSubmi
private JobStatus status;
private ArrayList<TaskAttemptID> mapIds = new ArrayList<TaskAttemptID>();
- private MapOutputFile mapoutputFile;
+
private JobProfile profile;
private Path localFile;
private FileSystem localFs;
@@ -83,8 +88,6 @@ class LocalJobRunner implements JobSubmi
this.systemJobDir = new Path(jobSubmitDir);
this.file = new Path(systemJobDir, "job.xml");
this.id = jobid;
- this.mapoutputFile = new MapOutputFile(jobid);
- this.mapoutputFile.setConf(conf);
this.localFile = new JobConf(conf).getLocalPath(jobDir+id+".xml");
this.localFs = FileSystem.getLocal(conf);
@@ -120,7 +123,9 @@ class LocalJobRunner implements JobSubmi
}
outputCommitter.setupJob(jContext);
status.setSetupProgress(1.0f);
-
+
+ Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
+ new HashMap<TaskAttemptID, MapOutputFile>();
for (int i = 0; i < taskSplitMetaInfos.length; i++) {
if (!this.isInterrupted()) {
TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, true, i),0);
@@ -129,6 +134,12 @@ class LocalJobRunner implements JobSubmi
mapId, i,
taskSplitMetaInfos[i].getSplitIndex(), 1);
JobConf localConf = new JobConf(job);
+ TaskRunner.setupChildMapredLocalDirs(map, localConf);
+
+ MapOutputFile mapOutput = new MapOutputFile();
+ mapOutput.setConf(localConf);
+ mapOutputFiles.put(mapId, mapOutput);
+
map.setJobFile(localFile.toString());
map.localizeConfiguration(localConf);
map.setConf(localConf);
@@ -146,14 +157,21 @@ class LocalJobRunner implements JobSubmi
new TaskAttemptID(new TaskID(jobId, false, 0), 0);
try {
if (numReduceTasks > 0) {
+ ReduceTask reduce =
+ new ReduceTask(file.toString(), reduceId, 0, mapIds.size(),
+ 1);
+ JobConf localConf = new JobConf(job);
+ TaskRunner.setupChildMapredLocalDirs(reduce, localConf);
// move map output to reduce input
for (int i = 0; i < mapIds.size(); i++) {
if (!this.isInterrupted()) {
TaskAttemptID mapId = mapIds.get(i);
- Path mapOut = this.mapoutputFile.getOutputFile(mapId);
- Path reduceIn = this.mapoutputFile.getInputFileForWrite(
- mapId.getTaskID(),reduceId,
- localFs.getLength(mapOut));
+ Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
+ MapOutputFile localOutputFile = new MapOutputFile();
+ localOutputFile.setConf(localConf);
+ Path reduceIn =
+ localOutputFile.getInputFileForWrite(mapId.getTaskID(),
+ localFs.getFileStatus(mapOut).getLen());
if (!localFs.mkdirs(reduceIn.getParent())) {
throw new IOException("Mkdirs failed to create "
+ reduceIn.getParent().toString());
@@ -165,10 +183,6 @@ class LocalJobRunner implements JobSubmi
}
}
if (!this.isInterrupted()) {
- ReduceTask reduce = new ReduceTask(file.toString(),
- reduceId, 0, mapIds.size(),
- 1);
- JobConf localConf = new JobConf(job);
reduce.setJobFile(localFile.toString());
reduce.localizeConfiguration(localConf);
reduce.setConf(localConf);
@@ -183,11 +197,8 @@ class LocalJobRunner implements JobSubmi
}
}
} finally {
- for (TaskAttemptID mapId: mapIds) {
- this.mapoutputFile.removeAll(mapId);
- }
- if (numReduceTasks == 1) {
- this.mapoutputFile.removeAll(reduceId);
+ for (MapOutputFile output : mapOutputFiles.values()) {
+ output.removeAll();
}
}
// delete the temporary directory in output directory
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java Fri Mar 4 03:42:01 2011
@@ -30,144 +30,152 @@ import org.apache.hadoop.fs.Path;
class MapOutputFile {
private JobConf conf;
- private JobID jobId;
-
- MapOutputFile() {
- }
- MapOutputFile(JobID jobId) {
- this.jobId = jobId;
+ static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
+
+ MapOutputFile() {
}
private LocalDirAllocator lDirAlloc =
new LocalDirAllocator("mapred.local.dir");
- /** Return the path to local map output file created earlier
- * @param mapTaskId a map task id
- */
- public Path getOutputFile(TaskAttemptID mapTaskId)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out", conf);
+ /**
+ * Return the path to local map output file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputFile()
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
+ + "file.out", conf);
}
- /** Create a local map output file name.
- * @param mapTaskId a map task id
+ /**
+ * Create a local map output file name.
+ *
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getOutputFileForWrite(TaskAttemptID mapTaskId, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out", size, conf);
- }
-
- /** Return the path to a local map output index file created earlier
- * @param mapTaskId a map task id
- */
- public Path getOutputIndexFile(TaskAttemptID mapTaskId)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out.index", conf);
- }
-
- /** Create a local map output index file name.
- * @param mapTaskId a map task id
+ public Path getOutputFileForWrite(long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
+ + "file.out", size, conf);
+ }
+
+ /**
+ * Return the path to a local map output index file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputIndexFile()
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
+ + "file.out.index", conf);
+ }
+
+ /**
+ * Create a local map output index file name.
+ *
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getOutputIndexFileForWrite(TaskAttemptID mapTaskId, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out.index",
- size, conf);
+ public Path getOutputIndexFileForWrite(long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
+ + "file.out.index", size, conf);
}
- /** Return a local map spill file created earlier.
- * @param mapTaskId a map task id
+ /**
+ * Return a local map spill file created earlier.
+ *
* @param spillNumber the number
+ * @return path
+ * @throws IOException
*/
- public Path getSpillFile(TaskAttemptID mapTaskId, int spillNumber)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill"
- + spillNumber + ".out", conf);
+ public Path getSpillFile(int spillNumber)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
+ + spillNumber + ".out", conf);
}
- /** Create a local map spill file name.
- * @param mapTaskId a map task id
+ /**
+ * Create a local map spill file name.
+ *
* @param spillNumber the number
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getSpillFileForWrite(TaskAttemptID mapTaskId, int spillNumber,
- long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill" +
- spillNumber + ".out", size, conf);
+ public Path getSpillFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
+ + spillNumber + ".out", size, conf);
}
- /** Return a local map spill index file created earlier
- * @param mapTaskId a map task id
+ /**
+ * Return a local map spill index file created earlier
+ *
* @param spillNumber the number
+ * @return path
+ * @throws IOException
*/
- public Path getSpillIndexFile(TaskAttemptID mapTaskId, int spillNumber)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill" +
- spillNumber + ".out.index", conf);
+ public Path getSpillIndexFile(int spillNumber)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
+ + spillNumber + ".out.index", conf);
}
- /** Create a local map spill index file name.
- * @param mapTaskId a map task id
+ /**
+ * Create a local map spill index file name.
+ *
* @param spillNumber the number
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getSpillIndexFileForWrite(TaskAttemptID mapTaskId, int spillNumber,
- long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill" + spillNumber +
- ".out.index", size, conf);
- }
-
- /** Return a local reduce input file created earlier
- * @param mapTaskId a map task id
- * @param reduceTaskId a reduce task id
- */
- public Path getInputFile(int mapId, TaskAttemptID reduceTaskId)
- throws IOException {
- // TODO *oom* should use a format here
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), reduceTaskId.toString())
- + "/map_" + mapId + ".out",
- conf);
- }
-
- /** Create a local reduce input file name.
- * @param mapTaskId a map task id
- * @param reduceTaskId a reduce task id
+ public Path getSpillIndexFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
+ + spillNumber + ".out.index", size, conf);
+ }
+
+ /**
+ * Return a local reduce input file created earlier
+ *
+ * @param mapId a map task id
+ * @return path
+ * @throws IOException
+ */
+ public Path getInputFile(int mapId)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(String.format(
+ REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, Integer
+ .valueOf(mapId)), conf);
+ }
+
+ /**
+ * Create a local reduce input file name.
+ *
+ * @param mapId a map task id
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getInputFileForWrite(TaskID mapId, TaskAttemptID reduceTaskId,
- long size)
- throws IOException {
- // TODO *oom* should use a format here
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), reduceTaskId.toString())
- + "/map_" + mapId.getId() + ".out",
- size, conf);
+ public Path getInputFileForWrite(TaskID mapId, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(String.format(
+ REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, mapId.getId()),
+ size, conf);
}
/** Removes all of the files related to a task. */
- public void removeAll(TaskAttemptID taskId) throws IOException {
- conf.deleteLocalFiles(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), taskId.toString())
-);
+ public void removeAll()
+ throws IOException {
+ conf.deleteLocalFiles(TaskTracker.OUTPUT);
}
public void setConf(Configuration conf) {
@@ -177,9 +185,4 @@ class MapOutputFile {
this.conf = new JobConf(conf);
}
}
-
- public void setJobId(JobID jobId) {
- this.jobId = jobId;
- }
-
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri Mar 4 03:42:01 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -77,7 +78,6 @@ class MapTask extends Task {
* The size of each record in the index file for the map-outputs.
*/
public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
-
private TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
private String splitClass;
@@ -106,12 +106,20 @@ class MapTask extends Task {
}
@Override
- public void localizeConfiguration(JobConf conf) throws IOException {
+ public void localizeConfiguration(JobConf conf)
+ throws IOException {
super.localizeConfiguration(conf);
+ // split.info file is used only by IsolationRunner.
+ // Write the split file to the local disk if it is a normal map task (not a
+ // job-setup or a job-cleanup task) and if the user wishes to run
+ // IsolationRunner either by setting keep.failed.tasks.files to true or by
+ // using keep.tasks.files.pattern
if (supportIsolationRunner(conf) && isMapOrReduce()) {
// localize the split meta-information
- Path localSplitMeta = new Path(new Path(getJobFile()).getParent(),
- "split.info");
+ Path localSplitMeta =
+ new LocalDirAllocator("mapred.local.dir").getLocalPathForWrite(
+ TaskTracker.getLocalSplitFile(getJobID().toString(), getTaskID()
+ .toString()), conf);
LOG.debug("Writing local split to " + localSplitMeta);
DataOutputStream out = FileSystem.getLocal(conf).create(localSplitMeta);
splitMetaInfo.write(out);
@@ -1228,8 +1236,8 @@ class MapTask extends Task {
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
- final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, size);
+ final Path filename =
+ mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
final int endPosition = (kvend > kvstart)
@@ -1293,9 +1301,9 @@ class MapTask extends Task {
if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
// create spill index file
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ Path indexFilename =
+ mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+ * MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
@@ -1321,8 +1329,8 @@ class MapTask extends Task {
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
- final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, size);
+ final Path filename =
+ mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
// we don't run the combiner for a single record
@@ -1358,9 +1366,9 @@ class MapTask extends Task {
}
if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
// create spill index file
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ Path indexFilename =
+ mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+ * MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
@@ -1450,14 +1458,14 @@ class MapTask extends Task {
final TaskAttemptID mapId = getTaskID();
for(int i = 0; i < numSpills; i++) {
- filename[i] = mapOutputFile.getSpillFile(mapId, i);
+ filename[i] = mapOutputFile.getSpillFile(i);
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
if (numSpills == 1) { //the spill is the final output
rfs.rename(filename[0],
new Path(filename[0].getParent(), "file.out"));
if (indexCacheList.size() == 0) {
- rfs.rename(mapOutputFile.getSpillIndexFile(mapId, 0),
+ rfs.rename(mapOutputFile.getSpillIndexFile(0),
new Path(filename[0].getParent(),"file.out.index"));
} else {
indexCacheList.get(0).writeToFile(
@@ -1468,7 +1476,7 @@ class MapTask extends Task {
// read in paged indices
for (int i = indexCacheList.size(); i < numSpills; ++i) {
- Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, i);
+ Path indexFileName = mapOutputFile.getSpillIndexFile(i);
indexCacheList.add(new SpillRecord(indexFileName, job));
}
@@ -1476,10 +1484,10 @@ class MapTask extends Task {
//lengths for each partition
finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- Path finalOutputFile = mapOutputFile.getOutputFileForWrite(mapId,
- finalOutFileSize);
- Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
- mapId, finalIndexFileSize);
+ Path finalOutputFile =
+ mapOutputFile.getOutputFileForWrite(finalOutFileSize);
+ Path finalIndexFile =
+ mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java Fri Mar 4 03:42:01 2011
@@ -34,13 +34,13 @@ class MapTaskRunner extends TaskRunner {
return false;
}
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
return true;
}
/** Delete all of the temporary map output files. */
public void close() throws IOException {
LOG.info(getTask()+" done; removing files.");
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Mar 4 03:42:01 2011
@@ -213,7 +213,7 @@ class ReduceTask extends Task {
if (isLocal) {
// for local jobs
for(int i = 0; i < numMaps; ++i) {
- fileList.add(mapOutputFile.getInputFile(i, getTaskID()));
+ fileList.add(mapOutputFile.getInputFile(i));
}
} else {
// for non local jobs
@@ -1287,12 +1287,11 @@ class ReduceTask extends Task {
// else, we will check the localFS to find a suitable final location
// for this path
TaskAttemptID reduceId = reduceTask.getTaskID();
- Path filename = new Path("/" + TaskTracker.getIntermediateOutputDir(
- reduceId.getJobID().toString(),
- reduceId.toString())
- + "/map_" +
- loc.getTaskId().getId() + ".out");
-
+ Path filename =
+ new Path(String.format(
+ MapOutputFile.REDUCE_INPUT_FILE_FORMAT_STRING,
+ TaskTracker.OUTPUT, loc.getTaskId().getId()));
+
// Copy the map output to a temp file whose name is unique to this attempt
Path tmpMapOutput = new Path(filename+"-"+id);
@@ -2350,8 +2349,8 @@ class ReduceTask extends Task {
if (numMemDiskSegments > 0 &&
ioSortFactor > mapOutputFilesOnDisk.size()) {
// must spill to disk, but can't retain in-mem for intermediate merge
- final Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
- reduceTask.getTaskID(), inMemToDiskBytes);
+ final Path outputPath =
+ mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes);
final RawKeyValueIterator rIter = Merger.merge(job, fs,
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
tmpDir, comparator, reporter, spilledRecordsCounter, null);
@@ -2649,8 +2648,8 @@ class ReduceTask extends Task {
long mergeOutputSize = createInMemorySegments(inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
- Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
- reduceTask.getTaskID(), mergeOutputSize);
+ Path outputPath =
+ mapOutputFile.getInputFileForWrite(mapId, mergeOutputSize);
Writer writer =
new Writer(conf, rfs, outputPath,
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java Fri Mar 4 03:42:01 2011
@@ -37,7 +37,7 @@ class ReduceTaskRunner extends TaskRunne
}
// cleanup from failures
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
return true;
}
@@ -46,6 +46,6 @@ class ReduceTaskRunner extends TaskRunne
public void close() throws IOException {
LOG.info(getTask()+" done; removing files.");
getTask().getProgress().setStatus("closed");
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar 4 03:42:01 2011
@@ -179,7 +179,6 @@ abstract public class Task implements Wr
TaskStatus.Phase.MAP :
TaskStatus.Phase.SHUFFLE,
counters);
- this.mapOutputFile.setJobId(taskId.getJobID());
spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
}
@@ -405,7 +404,6 @@ abstract public class Task implements Wr
partition = in.readInt();
numSlotsRequired = in.readInt();
taskStatus.readFields(in);
- this.mapOutputFile.setJobId(taskId.getJobID());
skipRanges.readFields(in);
currentRecIndexIterator = skipRanges.skipRangeIterator();
currentRecStartIndex = currentRecIndexIterator.next();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Mar 4 03:42:01 2011
@@ -17,13 +17,17 @@
*/
package org.apache.hadoop.mapred;
+import java.io.File;
import java.io.IOException;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.TaskTracker.PermissionsHandler;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -45,27 +49,95 @@ abstract class TaskController implements
public Configuration getConf() {
return conf;
}
-
+
+ // The list of directory paths specified in the variable mapred.local.dir.
+ // This is used to determine which among the list of directories is picked up
+ // for storing data for a particular task.
+ protected String[] mapredLocalDirs;
+
public void setConf(Configuration conf) {
this.conf = conf;
+ mapredLocalDirs = conf.getStrings("mapred.local.dir");
}
-
+
+ /**
+ * Sets up the permissions of the following directories on all the configured
+ * disks:
+ * <ul>
+ * <li>mapred-local directories</li>
+ * <li>Job cache directories</li>
+ * <li>Archive directories</li>
+ * <li>Hadoop log directories</li>
+ * </ul>
+ */
+ void setup() {
+ for (String localDir : this.mapredLocalDirs) {
+ // Set up the mapred-local directories.
+ File mapredlocalDir = new File(localDir);
+ if (!mapredlocalDir.exists() && !mapredlocalDir.mkdirs()) {
+ LOG.warn("Unable to create mapred-local directory : "
+ + mapredlocalDir.getPath());
+ } else {
+ PermissionsHandler.setPermissions(mapredlocalDir,
+ PermissionsHandler.sevenFiveFive);
+ }
+
+ // Set up the cache directory used for distributed cache files
+ File distributedCacheDir =
+ new File(localDir, TaskTracker.getDistributedCacheDir());
+ if (!distributedCacheDir.exists() && !distributedCacheDir.mkdirs()) {
+ LOG.warn("Unable to create cache directory : "
+ + distributedCacheDir.getPath());
+ } else {
+ PermissionsHandler.setPermissions(distributedCacheDir,
+ PermissionsHandler.sevenFiveFive);
+ }
+
+ // Set up the jobcache directory
+ File jobCacheDir = new File(localDir, TaskTracker.getJobCacheSubdir());
+ if (!jobCacheDir.exists() && !jobCacheDir.mkdirs()) {
+ LOG.warn("Unable to create job cache directory : "
+ + jobCacheDir.getPath());
+ } else {
+ PermissionsHandler.setPermissions(jobCacheDir,
+ PermissionsHandler.sevenFiveFive);
+ }
+ }
+
+ // Set up the user log directory
+ File taskLog = TaskLog.getUserLogDir();
+ if (!taskLog.exists() && !taskLog.mkdirs()) {
+ LOG.warn("Unable to create taskLog directory : " + taskLog.getPath());
+ } else {
+ PermissionsHandler.setPermissions(taskLog,
+ PermissionsHandler.sevenFiveFive);
+ }
+ }
+
/**
- * Setup task controller component.
+ * Take task-controller specific actions to initialize job. This involves
+ * setting appropriate permissions to job-files so as to secure the files to
+ * be accessible only by the user's tasks.
*
+ * @throws IOException
*/
- abstract void setup();
-
-
+ abstract void initializeJob(JobInitializationContext context) throws IOException;
+
/**
* Launch a task JVM
*
- * This method defines how a JVM will be launched to run a task.
+ * This method defines how a JVM will be launched to run a task. Each
+ * task-controller should also do an
+ * {@link #initializeTask(TaskControllerContext)} inside this method so as to
+ * initialize the task before launching it. This is for reasons of
+ * task-controller specific optimizations w.r.t combining initialization and
+ * launching of tasks.
+ *
* @param context the context associated to the task
*/
abstract void launchTaskJVM(TaskControllerContext context)
throws IOException;
-
+
/**
* Top level cleanup a task JVM method.
*
@@ -90,47 +162,44 @@ abstract class TaskController implements
}
killTask(context);
}
-
- /**
- * Perform initializing actions required before a task can run.
- *
- * For instance, this method can be used to setup appropriate
- * access permissions for files and directories that will be
- * used by tasks. Tasks use the job cache, log, PID and distributed cache
- * directories and files as part of their functioning. Typically,
- * these files are shared between the daemon and the tasks
- * themselves. So, a TaskController that is launching tasks
- * as different users can implement this method to setup
- * appropriate ownership and permissions for these directories
- * and files.
- */
- abstract void initializeTask(TaskControllerContext context);
-
-
+
+ /** Perform initializing actions required before a task can run.
+ *
+ * For instance, this method can be used to setup appropriate
+ * access permissions for files and directories that will be
+ * used by tasks. Tasks use the job cache, log, and distributed cache
+ * directories and files as part of their functioning. Typically,
+ * these files are shared between the daemon and the tasks
+ * themselves. So, a TaskController that is launching tasks
+ * as different users can implement this method to setup
+ * appropriate ownership and permissions for these directories
+ * and files.
+ */
+ abstract void initializeTask(TaskControllerContext context)
+ throws IOException;
+
/**
* Contains task information required for the task controller.
*/
static class TaskControllerContext {
// task being executed
- Task task;
- // the JVM environment for the task
- JvmEnv env;
- // the Shell executor executing the JVM for this task
- ShellCommandExecutor shExec;
- // process handle of task JVM
- String pid;
- // waiting time before sending SIGKILL to task JVM after sending SIGTERM
- long sleeptimeBeforeSigkill;
+ Task task;
+ ShellCommandExecutor shExec; // the Shell executor executing the JVM for this task.
+
+ // Information used only when this context is used for launching new tasks.
+ JvmEnv env; // the JVM environment for the task.
+
+ // Information used only when this context is used for destroying a task jvm.
+ String pid; // process handle of task JVM.
+ long sleeptimeBeforeSigkill; // waiting time before sending SIGKILL to task JVM after sending SIGTERM
+ }
+
+ static class JobInitializationContext {
+ JobID jobid;
+ File workDir;
+ String user;
}
- /**
- * Method which is called after the job is localized so that task controllers
- * can implement their own job localization logic.
- *
- * @param tip Task of job for which localization happens.
- */
- abstract void initializeJob(JobID jobId);
-
/**
* Sends a graceful terminate signal to taskJVM and it sub-processes.
*
@@ -144,6 +213,5 @@ abstract class TaskController implements
*
* @param context task context
*/
-
abstract void killTask(TaskControllerContext context);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Mar 4 03:42:01 2011
@@ -57,9 +57,10 @@ public class TaskLog {
private static final Log LOG =
LogFactory.getLog(TaskLog.class);
+ static final String USERLOGS_DIR_NAME = "userlogs";
+
private static final File LOG_DIR =
- new File(System.getProperty("hadoop.log.dir"),
- "userlogs").getAbsoluteFile();
+ new File(getBaseLogDir(), USERLOGS_DIR_NAME).getAbsoluteFile();
// localFS is set in (and used by) writeToIndexFile()
static LocalFileSystem localFS = null;
@@ -178,7 +179,11 @@ public class TaskLog {
return new File(getBaseDir(taskid), "log.index");
}
}
-
+
+ static String getBaseLogDir() {
+ return System.getProperty("hadoop.log.dir");
+ }
+
static File getBaseDir(String taskid) {
return new File(LOG_DIR, taskid);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar 4 03:42:01 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskTracker.PermissionsHandler;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.mapreduce.JobContext;
@@ -78,7 +79,7 @@ abstract class TaskRunner extends Thread
this.t = tip.getTask();
this.tracker = tracker;
this.conf = conf;
- this.mapOutputFile = new MapOutputFile(t.getJobID());
+ this.mapOutputFile = new MapOutputFile();
this.mapOutputFile.setConf(conf);
this.jvmManager = tracker.getJvmManagerInstance();
}
@@ -125,223 +126,41 @@ abstract class TaskRunner extends Thread
URI[] archives = DistributedCache.getCacheArchives(conf);
URI[] files = DistributedCache.getCacheFiles(conf);
+ // We don't create any symlinks yet, so presence/absence of workDir
+ // actually on the file system doesn't matter.
setupDistributedCache(lDirAlloc, workDir, archives, files);
-
+
+ // Set up the child task's configuration. After this call, no localization
+ // of files should happen in the TaskTracker's process space. Any changes to
+ // the conf object after this will NOT be reflected to the child.
+ setupChildTaskConfiguration(lDirAlloc);
+
if (!prepare()) {
return;
}
-
- // Accumulates class paths for child.
- List<String> classPaths = new ArrayList<String>();
- // start with same classpath as parent process
- appendSystemClasspaths(classPaths);
-
- if (!workDir.mkdirs()) {
- if (!workDir.isDirectory()) {
- LOG.fatal("Mkdirs failed to create " + workDir.toString());
- }
- }
-
- // include the user specified classpath
- appendJobJarClasspaths(conf.getJar(), classPaths);
-
- // Distributed cache paths
- appendDistributedCacheClasspaths(conf, archives, files, classPaths);
-
- // Include the working dir too
- classPaths.add(workDir.toString());
// Build classpath
+ List<String> classPaths = getClassPaths(conf, workDir, archives, files);
+ long logSize = TaskLog.getTaskLogLength(conf);
// Build exec child JVM args.
- Vector<String> vargs = new Vector<String>(8);
- File jvm = // use same jvm as parent
- new File(new File(System.getProperty("java.home"), "bin"), "java");
-
- vargs.add(jvm.toString());
-
- // Add child (task) java-vm options.
- //
- // The following symbols if present in mapred.child.java.opts value are
- // replaced:
- // + @taskid@ is interpolated with value of TaskID.
- // Other occurrences of @ will not be altered.
- //
- // Example with multiple arguments and substitutions, showing
- // jvm GC logging, and start of a passwordless JVM JMX agent so can
- // connect with jconsole and the likes to watch child memory, threads
- // and get thread dumps.
- //
- // <property>
- // <name>mapred.child.java.opts</name>
- // <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc \
- // -Dcom.sun.management.jmxremote.authenticate=false \
- // -Dcom.sun.management.jmxremote.ssl=false \
- // </value>
- // </property>
- //
- String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
- javaOpts = javaOpts.replace("@taskid@", taskid.toString());
- String [] javaOptsSplit = javaOpts.split(" ");
-
- // Add java.library.path; necessary for loading native libraries.
- //
- // 1. To support native-hadoop library i.e. libhadoop.so, we add the
- // parent processes' java.library.path to the child.
- // 2. We also add the 'cwd' of the task to it's java.library.path to help
- // users distribute native libraries via the DistributedCache.
- // 3. The user can also specify extra paths to be added to the
- // java.library.path via mapred.child.java.opts.
- //
- String libraryPath = System.getProperty("java.library.path");
- if (libraryPath == null) {
- libraryPath = workDir.getAbsolutePath();
- } else {
- libraryPath += SYSTEM_PATH_SEPARATOR + workDir;
- }
- boolean hasUserLDPath = false;
- for(int i=0; i<javaOptsSplit.length ;i++) {
- if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
- javaOptsSplit[i] += SYSTEM_PATH_SEPARATOR + libraryPath;
- hasUserLDPath = true;
- break;
- }
- }
- if(!hasUserLDPath) {
- vargs.add("-Djava.library.path=" + libraryPath);
- }
- for (int i = 0; i < javaOptsSplit.length; i++) {
- vargs.add(javaOptsSplit[i]);
- }
-
- // add java.io.tmpdir given by mapred.child.tmp
- String tmp = conf.get("mapred.child.tmp", "./tmp");
- Path tmpDir = new Path(tmp);
-
- // if temp directory path is not absolute
- // prepend it with workDir.
- if (!tmpDir.isAbsolute()) {
- tmpDir = new Path(workDir.toString(), tmp);
- }
- FileSystem localFs = FileSystem.getLocal(conf);
- if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
- throw new IOException("Mkdirs failed to create " + tmpDir.toString());
- }
- vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());
-
- // Add classpath.
- vargs.add("-classpath");
- String classPath = StringUtils.join(SYSTEM_PATH_SEPARATOR, classPaths);
- vargs.add(classPath);
-
- // Setup the log4j prop
- long logSize = TaskLog.getTaskLogLength(conf);
- vargs.add("-Dhadoop.log.dir=" +
- new File(System.getProperty("hadoop.log.dir")
- ).getAbsolutePath());
- vargs.add("-Dhadoop.root.logger=INFO,TLA");
- vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
- vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
-
- if (conf.getProfileEnabled()) {
- if (conf.getProfileTaskRange(t.isMapTask()
- ).isIncluded(t.getPartition())) {
- File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE);
- vargs.add(String.format(conf.getProfileParams(), prof.toString()));
- }
- }
-
- // Add main class and its arguments
- vargs.add(Child.class.getName()); // main of Child
- // pass umbilical address
- InetSocketAddress address = tracker.getTaskTrackerReportAddress();
- vargs.add(address.getAddress().getHostAddress());
- vargs.add(Integer.toString(address.getPort()));
- vargs.add(taskid.toString()); // pass task identifier
+ Vector<String> vargs = getVMArgs(taskid, workDir, classPaths, logSize);
tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
// set memory limit using ulimit if feasible and necessary ...
- String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
- List<String> setup = null;
- if (ulimitCmd != null) {
- setup = new ArrayList<String>();
- for (String arg : ulimitCmd) {
- setup.add(arg);
- }
- }
-
+ List<String> setup = getVMSetupCmd();
// Set up the redirection of the task's stdout and stderr streams
- File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
- File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
- stdout.getParentFile().mkdirs();
- tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr);
-
+ File[] logFiles = prepareLogFiles(taskid);
+ File stdout = logFiles[0];
+ File stderr = logFiles[1];
+ tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,
+ stderr);
+
Map<String, String> env = new HashMap<String, String>();
- StringBuffer ldLibraryPath = new StringBuffer();
- ldLibraryPath.append(workDir.toString());
- String oldLdLibraryPath = null;
- oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH");
- if (oldLdLibraryPath != null) {
- ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
- ldLibraryPath.append(oldLdLibraryPath);
- }
- env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
-
- String jobTokenFile = conf.get(JobContext.JOB_TOKEN_FILE);
- LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
- env.put("JOB_TOKEN_FILE", jobTokenFile);
-
- // for the child of task jvm, set hadoop.root.logger
- env.put("HADOOP_ROOT_LOGGER","INFO,TLA");
- String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
- if (hadoopClientOpts == null) {
- hadoopClientOpts = "";
- } else {
- hadoopClientOpts = hadoopClientOpts + " ";
- }
- hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid
- + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
- env.put("HADOOP_CLIENT_OPTS", "\"" + hadoopClientOpts + "\"");
-
- // add the env variables passed by the user
- String mapredChildEnv = conf.get("mapred.child.env");
- if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
- String childEnvs[] = mapredChildEnv.split(",");
- for (String cEnv : childEnvs) {
- try {
- String[] parts = cEnv.split("="); // split on '='
- String value = env.get(parts[0]);
- if (value != null) {
- // replace $env with the child's env constructed by tt's
- // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
- value = parts[1].replace("$" + parts[0], value);
- } else {
- // this key is not configured by the tt for the child .. get it
- // from the tt's env
- // example PATH=$PATH:/tmp
- value = System.getenv(parts[0]);
- if (value != null) {
- // the env key is present in the tt's env
- value = parts[1].replace("$" + parts[0], value);
- } else {
- // the env key is note present anywhere .. simply set it
- // example X=$X:/tmp or X=/tmp
- value = parts[1].replace("$" + parts[0], "");
- }
- }
- env.put(parts[0], value);
- } catch (Throwable t) {
- // set the error msg
- errorInfo = "Invalid User environment settings : " + mapredChildEnv
- + ". Failed to parse user-passed environment param."
- + " Expecting : env1=value1,env2=value2...";
- LOG.warn(errorInfo);
- throw t;
- }
- }
- }
+ errorInfo = getVMEnvironment(errorInfo, workDir, conf, env, taskid,
+ logSize);
jvmManager.launchJvm(this,
jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,
@@ -369,7 +188,7 @@ abstract class TaskRunner extends Thread
LOG.fatal(t.getTaskID()+" reporting FSError", ie);
}
} catch (Throwable throwable) {
- LOG.warn(t.getTaskID() + errorInfo, throwable);
+ LOG.warn(t.getTaskID() + " : " + errorInfo, throwable);
Throwable causeThrowable = new Throwable(errorInfo, throwable);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
causeThrowable.printStackTrace(new PrintStream(baos));
@@ -404,15 +223,343 @@ abstract class TaskRunner extends Thread
}
}
+ /**
+ * Prepare the log files for the task
+ *
+ * @param taskid
+ * @return an array of files. The first file is stdout, the second is stderr.
+ */
+ static File[] prepareLogFiles(TaskAttemptID taskid) {
+ File[] logFiles = new File[2];
+ logFiles[0] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
+ logFiles[1] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+ File logDir = logFiles[0].getParentFile();
+ boolean b = logDir.mkdirs();
+ if (!b) {
+ LOG.warn("mkdirs failed. Ignoring");
+ } else {
+ PermissionsHandler.setPermissions(logDir,
+ PermissionsHandler.sevenZeroZero);
+ }
+ return logFiles;
+ }
+
+ /**
+ * Write the child's configuration to the disk and set it in configuration so
+ * that the child can pick it up from there.
+ *
+ * @param lDirAlloc
+ * @throws IOException
+ */
+ void setupChildTaskConfiguration(LocalDirAllocator lDirAlloc)
+ throws IOException {
+
+ Path localTaskFile =
+ lDirAlloc.getLocalPathForWrite(TaskTracker.getTaskConfFile(t
+ .getJobID().toString(), t.getTaskID().toString(), t
+ .isTaskCleanupTask()), conf);
+
+ // write the child's task configuration file to the local disk
+ writeLocalTaskFile(localTaskFile.toString(), conf);
+
+ // Set the final job file in the task. The child needs to know the correct
+ // path to job.xml. So set this path accordingly.
+ t.setJobFile(localTaskFile.toString());
+ }
+
+ /**
+ * @return
+ */
+ private List<String> getVMSetupCmd() {
+ String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
+ List<String> setup = null;
+ if (ulimitCmd != null) {
+ setup = new ArrayList<String>();
+ for (String arg : ulimitCmd) {
+ setup.add(arg);
+ }
+ }
+ return setup;
+ }
+
+ /**
+ * @param taskid
+ * @param workDir
+ * @param classPaths
+ * @param logSize
+ * @return
+ * @throws IOException
+ */
+ private Vector<String> getVMArgs(TaskAttemptID taskid, File workDir,
+ List<String> classPaths, long logSize)
+ throws IOException {
+ Vector<String> vargs = new Vector<String>(8);
+ File jvm = // use same jvm as parent
+ new File(new File(System.getProperty("java.home"), "bin"), "java");
+
+ vargs.add(jvm.toString());
+
+ // Add child (task) java-vm options.
+ //
+ // The following symbols if present in mapred.child.java.opts value are
+ // replaced:
+ // + @taskid@ is interpolated with value of TaskID.
+ // Other occurrences of @ will not be altered.
+ //
+ // Example with multiple arguments and substitutions, showing
+ // jvm GC logging, and start of a passwordless JVM JMX agent so can
+ // connect with jconsole and the likes to watch child memory, threads
+ // and get thread dumps.
+ //
+ // <property>
+ // <name>mapred.child.java.opts</name>
+ // <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc \
+ // -Dcom.sun.management.jmxremote.authenticate=false \
+ // -Dcom.sun.management.jmxremote.ssl=false \
+ // </value>
+ // </property>
+ //
+ String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
+ javaOpts = javaOpts.replace("@taskid@", taskid.toString());
+ String [] javaOptsSplit = javaOpts.split(" ");
+
+ // Add java.library.path; necessary for loading native libraries.
+ //
+ // 1. To support native-hadoop library i.e. libhadoop.so, we add the
+ // parent processes' java.library.path to the child.
+ // 2. We also add the 'cwd' of the task to it's java.library.path to help
+ // users distribute native libraries via the DistributedCache.
+ // 3. The user can also specify extra paths to be added to the
+ // java.library.path via mapred.child.java.opts.
+ //
+ String libraryPath = System.getProperty("java.library.path");
+ if (libraryPath == null) {
+ libraryPath = workDir.getAbsolutePath();
+ } else {
+ libraryPath += SYSTEM_PATH_SEPARATOR + workDir;
+ }
+ boolean hasUserLDPath = false;
+ for(int i=0; i<javaOptsSplit.length ;i++) {
+ if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
+ javaOptsSplit[i] += SYSTEM_PATH_SEPARATOR + libraryPath;
+ hasUserLDPath = true;
+ break;
+ }
+ }
+ if(!hasUserLDPath) {
+ vargs.add("-Djava.library.path=" + libraryPath);
+ }
+ for (int i = 0; i < javaOptsSplit.length; i++) {
+ vargs.add(javaOptsSplit[i]);
+ }
+
+ Path childTmpDir = createChildTmpDir(workDir, conf);
+ vargs.add("-Djava.io.tmpdir=" + childTmpDir);
+
+ // Add classpath.
+ vargs.add("-classpath");
+ String classPath = StringUtils.join(SYSTEM_PATH_SEPARATOR, classPaths);
+ vargs.add(classPath);
+
+ // Setup the log4j prop
+ vargs.add("-Dhadoop.log.dir=" +
+ new File(System.getProperty("hadoop.log.dir")
+ ).getAbsolutePath());
+ vargs.add("-Dhadoop.root.logger=INFO,TLA");
+ vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
+ vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
+
+ if (conf.getProfileEnabled()) {
+ if (conf.getProfileTaskRange(t.isMapTask()
+ ).isIncluded(t.getPartition())) {
+ File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE);
+ vargs.add(String.format(conf.getProfileParams(), prof.toString()));
+ }
+ }
+
+ // Add main class and its arguments
+ vargs.add(Child.class.getName()); // main of Child
+ // pass umbilical address
+ InetSocketAddress address = tracker.getTaskTrackerReportAddress();
+ vargs.add(address.getAddress().getHostAddress());
+ vargs.add(Integer.toString(address.getPort()));
+ vargs.add(taskid.toString()); // pass task identifier
+ return vargs;
+ }
+
+ /**
+ * @param taskid
+ * @param workDir
+ * @return
+ * @throws IOException
+ */
+ static Path createChildTmpDir(File workDir,
+ JobConf conf)
+ throws IOException {
+
+ // add java.io.tmpdir given by mapred.child.tmp
+ String tmp = conf.get("mapred.child.tmp", "./tmp");
+ Path tmpDir = new Path(tmp);
+
+ // if temp directory path is not absolute, prepend it with workDir.
+ if (!tmpDir.isAbsolute()) {
+ tmpDir = new Path(workDir.toString(), tmp);
+
+ FileSystem localFs = FileSystem.getLocal(conf);
+ if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
+ throw new IOException("Mkdirs failed to create " + tmpDir.toString());
+ }
+ }
+ return tmpDir;
+ }
+
+ /**
+ */
+ private static List<String> getClassPaths(JobConf conf, File workDir,
+ URI[] archives, URI[] files)
+ throws IOException {
+ // Accumulates class paths for child.
+ List<String> classPaths = new ArrayList<String>();
+ // start with same classpath as parent process
+ appendSystemClasspaths(classPaths);
+
+ // include the user specified classpath
+ appendJobJarClasspaths(conf.getJar(), classPaths);
+
+ // Distributed cache paths
+ appendDistributedCacheClasspaths(conf, archives, files, classPaths);
+
+ // Include the working dir too
+ classPaths.add(workDir.toString());
+ return classPaths;
+ }
+
+ /**
+ * @param errorInfo
+ * @param workDir
+ * @param env
+ * @return
+ * @throws Throwable
+ */
+ private static String getVMEnvironment(String errorInfo, File workDir, JobConf conf,
+ Map<String, String> env, TaskAttemptID taskid, long logSize)
+ throws Throwable {
+ StringBuffer ldLibraryPath = new StringBuffer();
+ ldLibraryPath.append(workDir.toString());
+ String oldLdLibraryPath = null;
+ oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH");
+ if (oldLdLibraryPath != null) {
+ ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
+ ldLibraryPath.append(oldLdLibraryPath);
+ }
+ env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+
+ String jobTokenFile = conf.get(JobContext.JOB_TOKEN_FILE);
+ LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
+ env.put("JOB_TOKEN_FILE", jobTokenFile);
+
+ // for the child of task jvm, set hadoop.root.logger
+ env.put("HADOOP_ROOT_LOGGER","INFO,TLA");
+ String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
+ if (hadoopClientOpts == null) {
+ hadoopClientOpts = "";
+ } else {
+ hadoopClientOpts = hadoopClientOpts + " ";
+ }
+ hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid
+ + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
+ env.put("HADOOP_CLIENT_OPTS", "\"" + hadoopClientOpts + "\"");
+
+ // add the env variables passed by the user
+ String mapredChildEnv = conf.get("mapred.child.env");
+ if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
+ String childEnvs[] = mapredChildEnv.split(",");
+ for (String cEnv : childEnvs) {
+ try {
+ String[] parts = cEnv.split("="); // split on '='
+ String value = env.get(parts[0]);
+ if (value != null) {
+ // replace $env with the child's env constructed by tt's
+ // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
+ value = parts[1].replace("$" + parts[0], value);
+ } else {
+ // this key is not configured by the tt for the child .. get it
+ // from the tt's env
+ // example PATH=$PATH:/tmp
+ value = System.getenv(parts[0]);
+ if (value != null) {
+ // the env key is present in the tt's env
+ value = parts[1].replace("$" + parts[0], value);
+ } else {
+ // the env key is note present anywhere .. simply set it
+ // example X=$X:/tmp or X=/tmp
+ value = parts[1].replace("$" + parts[0], "");
+ }
+ }
+ env.put(parts[0], value);
+ } catch (Throwable t) {
+ // set the error msg
+ errorInfo = "Invalid User environment settings : " + mapredChildEnv
+ + ". Failed to parse user-passed environment param."
+ + " Expecting : env1=value1,env2=value2...";
+ LOG.warn(errorInfo);
+ throw t;
+ }
+ }
+ }
+ return errorInfo;
+ }
+
+ /**
+ * Write the task specific job-configuration file.
+ *
+ * @param localFs
+ * @throws IOException
+ */
+ private static void writeLocalTaskFile(String jobFile, JobConf conf)
+ throws IOException {
+ Path localTaskFile = new Path(jobFile);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ localFs.delete(localTaskFile, true);
+ OutputStream out = localFs.create(localTaskFile);
+ try {
+ conf.writeXml(out);
+ } finally {
+ out.close();
+ }
+ }
+
+ /**
+ * Prepare the mapred.local.dir for the child. The child is sand-boxed now.
+ * Whenever it uses LocalDirAllocator from now on inside the child, it will
+ * only see files inside the attempt-directory. This is done in the Child's
+ * process space.
+ */
+ static void setupChildMapredLocalDirs(Task t, JobConf conf) {
+ String[] localDirs = conf.getStrings("mapred.local.dir");
+ String jobId = t.getJobID().toString();
+ String taskId = t.getTaskID().toString();
+ boolean isCleanup = t.isTaskCleanupTask();
+ StringBuffer childMapredLocalDir =
+ new StringBuffer(localDirs[0] + Path.SEPARATOR
+ + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup));
+ for (int i = 1; i < localDirs.length; i++) {
+ childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
+ + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup));
+ }
+ LOG.debug("mapred.local.dir for child : " + childMapredLocalDir);
+ conf.set("mapred.local.dir", childMapredLocalDir.toString());
+ }
+
/** Creates the working directory pathname for a task attempt. */
static File formWorkDir(LocalDirAllocator lDirAlloc,
TaskAttemptID task, boolean isCleanup, JobConf conf)
throws IOException {
- File workDir = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getLocalTaskDir(task.getJobID().toString(),
- task.toString(), isCleanup)
- + Path.SEPARATOR + MRConstants.WORKDIR, conf).toString());
- return workDir;
+ Path workDir =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
+ .getJobID().toString(), task.toString(), isCleanup), conf);
+
+ return new File(workDir.toString());
}
private void setupDistributedCache(LocalDirAllocator lDirAlloc, File workDir,
@@ -431,7 +578,7 @@ abstract class TaskRunner extends Thread
fileStatus = fileSystem.getFileStatus(
new Path(archives[i].getPath()));
String cacheId = DistributedCache.makeRelative(archives[i],conf);
- String cachePath = TaskTracker.getCacheSubdir() +
+ String cachePath = TaskTracker.getDistributedCacheDir() +
Path.SEPARATOR + cacheId;
localPath = lDirAlloc.getLocalPathForWrite(cachePath,
@@ -457,7 +604,7 @@ abstract class TaskRunner extends Thread
fileStatus = fileSystem.getFileStatus(
new Path(files[i].getPath()));
String cacheId = DistributedCache.makeRelative(files[i], conf);
- String cachePath = TaskTracker.getCacheSubdir() +
+ String cachePath = TaskTracker.getDistributedCacheDir() +
Path.SEPARATOR + cacheId;
localPath = lDirAlloc.getLocalPathForWrite(cachePath,
@@ -474,20 +621,12 @@ abstract class TaskRunner extends Thread
}
DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
}
- Path localTaskFile = new Path(t.getJobFile());
- FileSystem localFs = FileSystem.getLocal(conf);
- localFs.delete(localTaskFile, true);
- OutputStream out = localFs.create(localTaskFile);
- try {
- conf.writeXml(out);
- } finally {
- out.close();
- }
}
}
- private void appendDistributedCacheClasspaths(JobConf conf, URI[] archives,
- URI[] files, List<String> classPaths) throws IOException {
+ private static void appendDistributedCacheClasspaths(JobConf conf,
+ URI[] archives, URI[] files, List<String> classPaths)
+ throws IOException {
// Archive paths
Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
if (archiveClasspaths != null && archives != null) {
@@ -522,8 +661,9 @@ abstract class TaskRunner extends Thread
}
}
- private void appendSystemClasspaths(List<String> classPaths) {
- for (String c : System.getProperty("java.class.path").split(SYSTEM_PATH_SEPARATOR)) {
+ private static void appendSystemClasspaths(List<String> classPaths) {
+ for (String c : System.getProperty("java.class.path").split(
+ SYSTEM_PATH_SEPARATOR)) {
classPaths.add(c);
}
}
@@ -605,19 +745,8 @@ abstract class TaskRunner extends Thread
// Do not exit even if symlinks have not been created.
LOG.warn(StringUtils.stringifyException(ie));
}
- // add java.io.tmpdir given by mapred.child.tmp
- String tmp = conf.get("mapred.child.tmp", "./tmp");
- Path tmpDir = new Path(tmp);
- // if temp directory path is not absolute
- // prepend it with workDir.
- if (!tmpDir.isAbsolute()) {
- tmpDir = new Path(workDir.toString(), tmp);
- FileSystem localFs = FileSystem.getLocal(conf);
- if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()){
- throw new IOException("Mkdirs failed to create " + tmpDir.toString());
- }
- }
+ createChildTmpDir(workDir, conf);
}
/**