You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:43:35 UTC
svn commit: r1077679 [4/6] - in
/hadoop/common/branches/branch-0.20-security-patches: ./
src/c++/task-controller/ src/c++/task-controller/impl/
src/c++/task-controller/test/ src/c++/task-controller/tests/
src/core/org/apache/hadoop/fs/ src/core/org/apa...
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Mar 4 04:43:33 2011
@@ -17,27 +17,23 @@
*/
package org.apache.hadoop.mapred;
-import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileWriter;
import java.io.IOException;
-import java.io.PrintWriter;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ProcessTree.Signal;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.StringUtils;
/**
* A {@link TaskController} that runs the task JVMs as the user
@@ -62,22 +58,22 @@ class LinuxTaskController extends TaskCo
private static final Log LOG =
LogFactory.getLog(LinuxTaskController.class);
-
- // Name of the executable script that will contain the child
- // JVM command line. See writeCommand for details.
- private static final String COMMAND_FILE = "taskjvm.sh";
// Path to the setuid executable.
- private static String taskControllerExe;
+ private String taskControllerExe;
+ private static final String TASK_CONTROLLER_EXEC_KEY =
+ "mapreduce.tasktracker.task-controller.exe";
- static {
- // the task-controller is expected to be under the $HADOOP_HOME/bin
- // directory.
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
File hadoopBin = new File(System.getenv("HADOOP_HOME"), "bin");
- taskControllerExe =
+ String defaultTaskController =
new File(hadoopBin, "task-controller").getAbsolutePath();
+ taskControllerExe = conf.get(TASK_CONTROLLER_EXEC_KEY,
+ defaultTaskController);
}
-
+
public LinuxTaskController() {
super();
}
@@ -85,27 +81,49 @@ class LinuxTaskController extends TaskCo
/**
* List of commands that the setuid script will execute.
*/
- enum TaskControllerCommands {
- INITIALIZE_USER,
- INITIALIZE_JOB,
- INITIALIZE_DISTRIBUTEDCACHE_FILE,
- LAUNCH_TASK_JVM,
- INITIALIZE_TASK,
- TERMINATE_TASK_JVM,
- KILL_TASK_JVM,
- ENABLE_TASK_FOR_CLEANUP,
- ENABLE_JOB_FOR_CLEANUP
+ enum Commands {
+ INITIALIZE_JOB(0),
+ LAUNCH_TASK_JVM(1),
+ SIGNAL_TASK(2),
+ DELETE_AS_USER(3),
+ DELETE_LOG_AS_USER(4);
+
+ private int value;
+ Commands(int value) {
+ this.value = value;
+ }
+ int getValue() {
+ return value;
+ }
+ }
+
+ /**
+ * Result codes returned from the C task-controller.
+ * These must match the values in task-controller.h.
+ */
+ enum ResultCode {
+ OK(0),
+ INVALID_USER_NAME(2),
+ INVALID_TASK_PID(9),
+ INVALID_CONFIG_FILE(24);
+
+ private final int value;
+ ResultCode(int value) {
+ this.value = value;
+ }
+ int getValue() {
+ return value;
+ }
}
@Override
- public void setup() throws IOException {
- super.setup();
+ public void setup(LocalDirAllocator allocator) throws IOException {
// Check the permissions of the task-controller binary by running it plainly.
// If permissions are correct, it returns an error code 1, else it returns
// 24 or something else if some other bugs are also present.
String[] taskControllerCmd =
- new String[] { getTaskControllerExecutablePath() };
+ new String[] { taskControllerExe };
ShellCommandExecutor shExec = new ShellCommandExecutor(taskControllerCmd);
try {
shExec.execute();
@@ -118,50 +136,97 @@ class LinuxTaskController extends TaskCo
+ "permissions/ownership with exit code " + exitCode, e);
}
}
+ this.allocator = allocator;
}
+
- /**
- * Launch a task JVM that will run as the owner of the job.
- *
- * This method launches a task JVM by executing a setuid executable that will
- * switch to the user and run the task. Also does initialization of the first
- * task in the same setuid process launch.
- */
@Override
- void launchTaskJVM(TaskController.TaskControllerContext context)
- throws IOException {
- JvmEnv env = context.env;
- // get the JVM command line.
- String cmdLine =
- TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr,
- env.logSize, true);
-
- StringBuffer sb = new StringBuffer();
- //export out all the environment variable before child command as
- //the setuid/setgid binaries would not be getting, any environmental
- //variables which begin with LD_*.
- for(Entry<String, String> entry : env.env.entrySet()) {
- sb.append("export ");
- sb.append(entry.getKey());
- sb.append("=");
- sb.append(entry.getValue());
- sb.append("\n");
+ public void initializeJob(String user, String jobid, Path credentials,
+ Path jobConf, TaskUmbilicalProtocol taskTracker
+ ) throws IOException {
+ List<String> command = new ArrayList<String>(
+ Arrays.asList(taskControllerExe,
+ user,
+ Integer.toString(Commands.INITIALIZE_JOB.getValue()),
+ jobid,
+ credentials.toUri().getPath().toString(),
+ jobConf.toUri().getPath().toString()));
+ File jvm = // use same jvm as parent
+ new File(new File(System.getProperty("java.home"), "bin"), "java");
+ command.add(jvm.toString());
+ command.add("-classpath");
+ command.add(System.getProperty("java.class.path"));
+ command.add("-Dhadoop.log.dir=" + TaskLog.getBaseLogDir());
+ command.add("-Dhadoop.root.logger=INFO,console");
+ command.add(JobLocalizer.class.getName()); // main of JobLocalizer
+ command.add(user);
+ command.add(jobid);
+ // add the task tracker's reporting address
+ InetSocketAddress ttAddr =
+ ((TaskTracker) taskTracker).getTaskTrackerReportAddress();
+ command.add(ttAddr.getHostName());
+ command.add(Integer.toString(ttAddr.getPort()));
+ String[] commandArray = command.toArray(new String[0]);
+ ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("initializeJob: " + Arrays.toString(commandArray));
}
- sb.append(cmdLine);
- // write the command to a file in the
- // task specific cache directory
- writeCommand(sb.toString(), getTaskCacheDirectory(context));
-
- // Call the taskcontroller with the right parameters.
- List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context);
- ShellCommandExecutor shExec = buildTaskControllerExecutor(
- TaskControllerCommands.LAUNCH_TASK_JVM,
- env.conf.getUser(),
- launchTaskJVMArgs, env.workDir, env.env);
- context.shExec = shExec;
try {
shExec.execute();
+ if (LOG.isDebugEnabled()) {
+ logOutput(shExec.getOutput());
+ }
+ } catch (ExitCodeException e) {
+ int exitCode = shExec.getExitCode();
+ logOutput(shExec.getOutput());
+ throw new IOException("Job initialization failed (" + exitCode + ")", e);
+ }
+ }
+
+ @Override
+ public int launchTask(String user,
+ String jobId,
+ String attemptId,
+ List<String> setup,
+ List<String> jvmArguments,
+ File currentWorkDirectory,
+ String stdout,
+ String stderr) throws IOException {
+
+ ShellCommandExecutor shExec = null;
+ try {
+ FileSystem rawFs = FileSystem.getLocal(getConf()).getRaw();
+ long logSize = 0; //TODO, Ref BUG:2854624
+ // get the JVM command line.
+ String cmdLine =
+ TaskLog.buildCommandLine(setup, jvmArguments,
+ new File(stdout), new File(stderr), logSize, true);
+
+ // write the command to a file in the
+ // task specific cache directory
+ Path p = new Path(allocator.getLocalPathForWrite(
+ TaskTracker.getPrivateDirTaskScriptLocation(user, jobId, attemptId),
+ getConf()), COMMAND_FILE);
+ String commandFile = writeCommand(cmdLine, rawFs, p);
+
+ String[] command =
+ new String[]{taskControllerExe,
+ user,
+ Integer.toString(Commands.LAUNCH_TASK_JVM.getValue()),
+ jobId,
+ attemptId,
+ currentWorkDirectory.toString(),
+ commandFile};
+ shExec = new ShellCommandExecutor(command);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("launchTask: " + Arrays.toString(command));
+ }
+ shExec.execute();
} catch (Exception e) {
+ if (shExec == null) {
+ return -1;
+ }
int exitCode = shExec.getExitCode();
LOG.warn("Exit code from task is : " + exitCode);
// 143 (SIGTERM) and 137 (SIGKILL) exit codes means the task was
@@ -173,445 +238,56 @@ class LinuxTaskController extends TaskCo
LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
logOutput(shExec.getOutput());
}
- throw new IOException(e);
- }
- if (LOG.isDebugEnabled()) {
- LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
- logOutput(shExec.getOutput());
- }
- }
-
- /**
- * Helper method that runs a LinuxTaskController command
- *
- * @param taskControllerCommand
- * @param user
- * @param cmdArgs
- * @param env
- * @throws IOException
- */
- private void runCommand(TaskControllerCommands taskControllerCommand,
- String user, List<String> cmdArgs, File workDir, Map<String, String> env)
- throws IOException {
-
- ShellCommandExecutor shExec =
- buildTaskControllerExecutor(taskControllerCommand, user, cmdArgs,
- workDir, env);
- try {
- shExec.execute();
- } catch (Exception e) {
- LOG.warn("Exit code from " + taskControllerCommand.toString() + " is : "
- + shExec.getExitCode());
- LOG.warn("Exception thrown by " + taskControllerCommand.toString() + " : "
- + StringUtils.stringifyException(e));
- LOG.info("Output from LinuxTaskController's "
- + taskControllerCommand.toString() + " follows:");
- logOutput(shExec.getOutput());
- throw new IOException(e);
+ return exitCode;
}
if (LOG.isDebugEnabled()) {
- LOG.info("Output from LinuxTaskController's "
- + taskControllerCommand.toString() + " follows:");
+ LOG.debug("Output from LinuxTaskController's launchTask follows:");
logOutput(shExec.getOutput());
}
- }
-
- /**
- * Returns list of arguments to be passed while initializing a new task. See
- * {@code buildTaskControllerExecutor(TaskControllerCommands, String,
- * List<String>, JvmEnv)} documentation.
- *
- * @param context
- * @return Argument to be used while launching Task VM
- */
- private List<String> buildInitializeTaskArgs(TaskControllerContext context) {
- List<String> commandArgs = new ArrayList<String>(3);
- String taskId = context.task.getTaskID().toString();
- String jobId = getJobId(context);
- commandArgs.add(jobId);
- if (!context.task.isTaskCleanupTask()) {
- commandArgs.add(taskId);
- } else {
- commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
- }
- return commandArgs;
+ return 0;
}
@Override
- void initializeTask(TaskControllerContext context)
- throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Going to do "
- + TaskControllerCommands.INITIALIZE_TASK.toString()
- + " for " + context.task.getTaskID().toString());
- }
- runCommand(TaskControllerCommands.INITIALIZE_TASK,
- context.env.conf.getUser(),
- buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
+ public void deleteAsUser(String user, String subDir) throws IOException {
+ String[] command =
+ new String[]{taskControllerExe,
+ user,
+ Integer.toString(Commands.DELETE_AS_USER.getValue()),
+ subDir};
+ ShellCommandExecutor shExec = new ShellCommandExecutor(command);
+ shExec.execute();
}
- /**
- * Builds the args to be passed to task-controller for enabling of task for
- * cleanup. Last arg in this List is either $attemptId or $attemptId/work
- */
- private List<String> buildTaskCleanupArgs(
- TaskControllerTaskPathDeletionContext context) {
- List<String> commandArgs = new ArrayList<String>(3);
- commandArgs.add(context.mapredLocalDir.toUri().getPath());
- commandArgs.add(context.task.getJobID().toString());
-
- String workDir = "";
- if (context.isWorkDir) {
- workDir = "/work";
- }
- if (context.task.isTaskCleanupTask()) {
- commandArgs.add(context.task.getTaskID() + TaskTracker.TASK_CLEANUP_SUFFIX
- + workDir);
- } else {
- commandArgs.add(context.task.getTaskID() + workDir);
- }
-
- return commandArgs;
- }
-
- /**
- * Builds the args to be passed to task-controller for enabling of job for
- * cleanup. Last arg in this List is $jobid.
- */
- private List<String> buildJobCleanupArgs(
- TaskControllerJobPathDeletionContext context) {
- List<String> commandArgs = new ArrayList<String>(2);
- commandArgs.add(context.mapredLocalDir.toUri().getPath());
- commandArgs.add(context.jobId.toString());
-
- return commandArgs;
- }
-
- /**
- * Enables the task for cleanup by changing permissions of the specified path
- * in the local filesystem
- */
@Override
- void enableTaskForCleanup(PathDeletionContext context)
- throws IOException {
- if (context instanceof TaskControllerTaskPathDeletionContext) {
- TaskControllerTaskPathDeletionContext tContext =
- (TaskControllerTaskPathDeletionContext) context;
- enablePathForCleanup(tContext,
- TaskControllerCommands.ENABLE_TASK_FOR_CLEANUP,
- buildTaskCleanupArgs(tContext));
- }
- else {
- throw new IllegalArgumentException("PathDeletionContext provided is not "
- + "TaskControllerTaskPathDeletionContext.");
- }
+ public void deleteLogAsUser(String user, String subDir) throws IOException {
+ String[] command =
+ new String[]{taskControllerExe,
+ user,
+ Integer.toString(Commands.DELETE_LOG_AS_USER.getValue()),
+ subDir};
+ ShellCommandExecutor shExec = new ShellCommandExecutor(command);
+ shExec.execute();
}
- /**
- * Enables the job for cleanup by changing permissions of the specified path
- * in the local filesystem
- */
@Override
- void enableJobForCleanup(PathDeletionContext context)
- throws IOException {
- if (context instanceof TaskControllerJobPathDeletionContext) {
- TaskControllerJobPathDeletionContext tContext =
- (TaskControllerJobPathDeletionContext) context;
- enablePathForCleanup(tContext,
- TaskControllerCommands.ENABLE_JOB_FOR_CLEANUP,
- buildJobCleanupArgs(tContext));
- } else {
- throw new IllegalArgumentException("PathDeletionContext provided is not "
- + "TaskControllerJobPathDeletionContext.");
- }
- }
-
- /**
- * Enable a path for cleanup
- * @param c {@link TaskControllerPathDeletionContext} for the path to be
- * cleaned up
- * @param command {@link TaskControllerCommands} for task/job cleanup
- * @param cleanupArgs arguments for the {@link LinuxTaskController} to enable
- * path cleanup
- */
- private void enablePathForCleanup(TaskControllerPathDeletionContext c,
- TaskControllerCommands command,
- List<String> cleanupArgs) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Going to do " + command.toString() + " for " + c.fullPath);
- }
-
- if ( c.user != null && c.fs instanceof LocalFileSystem) {
- try {
- runCommand(command, c.user, cleanupArgs, null, null);
- } catch(IOException e) {
- LOG.warn("Unable to change permissions for " + c.fullPath);
- }
- }
- else {
- throw new IllegalArgumentException("Either user is null or the "
- + "file system is not local file system.");
- }
- }
-
- private void logOutput(String output) {
- String shExecOutput = output;
- if (shExecOutput != null) {
- for (String str : shExecOutput.split("\n")) {
- LOG.info(str);
- }
- }
- }
-
- private String getJobId(TaskControllerContext context) {
- String taskId = context.task.getTaskID().toString();
- TaskAttemptID tId = TaskAttemptID.forName(taskId);
- String jobId = tId.getJobID().toString();
- return jobId;
- }
-
- /**
- * Returns list of arguments to be passed while launching task VM.
- * See {@code buildTaskControllerExecutor(TaskControllerCommands,
- * String, List<String>, JvmEnv)} documentation.
- * @param context
- * @return Argument to be used while launching Task VM
- */
- private List<String> buildLaunchTaskArgs(TaskControllerContext context) {
- List<String> commandArgs = new ArrayList<String>(3);
- LOG.debug("getting the task directory as: "
- + getTaskCacheDirectory(context));
- LOG.debug("getting the tt_root as " +getDirectoryChosenForTask(
- new File(getTaskCacheDirectory(context)),
- context) );
- commandArgs.add(getDirectoryChosenForTask(
- new File(getTaskCacheDirectory(context)),
- context));
- commandArgs.addAll(buildInitializeTaskArgs(context));
- return commandArgs;
- }
-
- // Get the directory from the list of directories configured
- // in mapred.local.dir chosen for storing data pertaining to
- // this task.
- private String getDirectoryChosenForTask(File directory,
- TaskControllerContext context) {
- String jobId = getJobId(context);
- String taskId = context.task.getTaskID().toString();
- for (String dir : mapredLocalDirs) {
- File mapredDir = new File(dir);
- File taskDir =
- new File(mapredDir, TaskTracker.getTaskWorkDir(context.task
- .getUser(), jobId, taskId, context.task.isTaskCleanupTask()))
- .getParentFile();
- if (directory.equals(taskDir)) {
- return dir;
- }
- }
-
- LOG.error("Couldn't parse task cache directory correctly");
- throw new IllegalArgumentException("invalid task cache directory "
- + directory.getAbsolutePath());
- }
-
- @Override
- public void initializeDistributedCacheFile(DistributedCacheFileContext context)
- throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Going to initialize distributed cache for " + context.user
- + " with localizedBaseDir " + context.localizedBaseDir +
- " and uniqueString " + context.uniqueString);
- }
- List<String> args = new ArrayList<String>();
- // Here, uniqueString might start with '-'. Adding -- in front of the
- // arguments indicates that they are non-option parameters.
- args.add("--");
- args.add(context.localizedBaseDir.toString());
- args.add(context.uniqueString);
- runCommand(TaskControllerCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE,
- context.user, args, context.workDir, null);
- }
-
- @Override
- public void initializeUser(InitializationContext context)
- throws IOException {
- LOG.debug("Going to initialize user directories for " + context.user
- + " on the TT");
- runCommand(TaskControllerCommands.INITIALIZE_USER, context.user,
- new ArrayList<String>(), context.workDir, null);
- }
-
- /**
- * Builds the command line for launching/terminating/killing task JVM.
- * Following is the format for launching/terminating/killing task JVM
- * <br/>
- * For launching following is command line argument:
- * <br/>
- * {@code user-name command tt-root job_id task_id}
- * <br/>
- * For terminating/killing task jvm.
- * {@code user-name command tt-root task-pid}
- *
- * @param command command to be executed.
- * @param userName user name
- * @param cmdArgs list of extra arguments
- * @param workDir working directory for the task-controller
- * @param env JVM environment variables.
- * @return {@link ShellCommandExecutor}
- * @throws IOException
- */
- private ShellCommandExecutor buildTaskControllerExecutor(
- TaskControllerCommands command, String userName, List<String> cmdArgs,
- File workDir, Map<String, String> env)
- throws IOException {
- String[] taskControllerCmd = new String[3 + cmdArgs.size()];
- taskControllerCmd[0] = getTaskControllerExecutablePath();
- taskControllerCmd[1] = userName;
- taskControllerCmd[2] = String.valueOf(command.ordinal());
- int i = 3;
- for (String cmdArg : cmdArgs) {
- taskControllerCmd[i++] = cmdArg;
- }
- if (LOG.isDebugEnabled()) {
- for (String cmd : taskControllerCmd) {
- LOG.debug("taskctrl command = " + cmd);
- }
- }
- ShellCommandExecutor shExec = null;
- if(workDir != null && workDir.exists()) {
- shExec = new ShellCommandExecutor(taskControllerCmd,
- workDir, env);
- } else {
- shExec = new ShellCommandExecutor(taskControllerCmd);
- }
-
- return shExec;
- }
-
- // Return the task specific directory under the cache.
- private String getTaskCacheDirectory(TaskControllerContext context) {
- // In the case of JVM reuse, the task specific directory
- // is different from what is set with respect with
- // env.workDir. Hence building this from the taskId everytime.
- String taskId = context.task.getTaskID().toString();
- File cacheDirForJob = context.env.workDir.getParentFile().getParentFile();
- if(context.task.isTaskCleanupTask()) {
- taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX;
- }
- return new File(cacheDirForJob, taskId).getAbsolutePath();
- }
-
- // Write the JVM command line to a file under the specified directory
- // Note that the JVM will be launched using a setuid executable, and
- // could potentially contain strings defined by a user. Hence, to
- // prevent special character attacks, we write the command line to
- // a file and execute it.
- private void writeCommand(String cmdLine,
- String directory) throws IOException {
-
- PrintWriter pw = null;
- String commandFile = directory + File.separator + COMMAND_FILE;
- LOG.info("Writing commands to " + commandFile);
- try {
- FileWriter fw = new FileWriter(commandFile);
- BufferedWriter bw = new BufferedWriter(fw);
- pw = new PrintWriter(bw);
- pw.write(cmdLine);
- } catch (IOException ioe) {
- LOG.error("Caught IOException while writing JVM command line to file. "
- + ioe.getMessage());
- } finally {
- if (pw != null) {
- pw.close();
- }
- // set execute permissions for all on the file.
- File f = new File(commandFile);
- if (f.exists()) {
- f.setReadable(true, false);
- f.setExecutable(true, false);
- }
- }
- }
-
- protected String getTaskControllerExecutablePath() {
- return taskControllerExe;
- }
-
- private List<String> buildInitializeJobCommandArgs(
- JobInitializationContext context) {
- List<String> initJobCmdArgs = new ArrayList<String>();
- initJobCmdArgs.add(context.jobid.toString());
- return initJobCmdArgs;
- }
-
- @Override
- void initializeJob(JobInitializationContext context)
- throws IOException {
- LOG.debug("Going to initialize job " + context.jobid.toString()
- + " on the TT");
- runCommand(TaskControllerCommands.INITIALIZE_JOB, context.user,
- buildInitializeJobCommandArgs(context), context.workDir, null);
- }
-
- /**
- * API which builds the command line to be pass to LinuxTaskController
- * binary to terminate/kill the task. See
- * {@code buildTaskControllerExecutor(TaskControllerCommands,
- * String, List<String>, JvmEnv)} documentation.
- *
- *
- * @param context context of task which has to be passed kill signal.
- *
- */
- private List<String> buildKillTaskCommandArgs(TaskControllerContext
- context){
- List<String> killTaskJVMArgs = new ArrayList<String>();
- killTaskJVMArgs.add(context.pid);
- return killTaskJVMArgs;
- }
-
- /**
- * Convenience method used to sending appropriate Kill signal to the task
- * VM
- * @param context
- * @param command
- * @throws IOException
- */
- private void finishTask(TaskControllerContext context,
- TaskControllerCommands command) throws IOException{
- if(context.task == null) {
- LOG.info("Context task null not killing the JVM");
- return;
- }
- ShellCommandExecutor shExec = buildTaskControllerExecutor(
- command, context.env.conf.getUser(),
- buildKillTaskCommandArgs(context), context.env.workDir,
- context.env.env);
+ public void signalTask(String user, int taskPid,
+ Signal signal) throws IOException {
+ String[] command =
+ new String[]{taskControllerExe,
+ user,
+ Integer.toString(Commands.SIGNAL_TASK.getValue()),
+ Integer.toString(taskPid),
+ Integer.toString(signal.getValue())};
+ ShellCommandExecutor shExec = new ShellCommandExecutor(command);
try {
shExec.execute();
- } catch (Exception e) {
- LOG.warn("Output from task-contoller is : " + shExec.getOutput());
- throw new IOException(e);
- }
- }
-
- @Override
- void terminateTask(TaskControllerContext context) {
- try {
- finishTask(context, TaskControllerCommands.TERMINATE_TASK_JVM);
- } catch (Exception e) {
- LOG.warn("Exception thrown while sending kill to the Task VM " +
- StringUtils.stringifyException(e));
- }
- }
-
- @Override
- void killTask(TaskControllerContext context) {
- try {
- finishTask(context, TaskControllerCommands.KILL_TASK_JVM);
- } catch (Exception e) {
- LOG.warn("Exception thrown while sending destroy to the Task VM " +
- StringUtils.stringifyException(e));
+ } catch (ExitCodeException e) {
+ int ret_code = shExec.getExitCode();
+ if (ret_code != ResultCode.INVALID_TASK_PID.getValue()) {
+ logOutput(shExec.getOutput());
+ throw new IOException("Problem signalling task " + taskPid + " with " +
+ signal + "; exit = " + ret_code);
+ }
}
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar 4 04:43:33 2011
@@ -33,7 +33,6 @@ import org.apache.hadoop.filecache.Distr
import org.apache.hadoop.filecache.TaskDistributedCacheManager;
import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -59,6 +58,7 @@ class LocalJobRunner implements JobSubmi
private int map_tasks = 0;
private int reduce_tasks = 0;
final Random rand = new Random();
+ private final TaskController taskController = new DefaultTaskController();
private JobTrackerInstrumentation myMetrics = null;
@@ -89,7 +89,7 @@ class LocalJobRunner implements JobSubmi
private FileSystem localFs;
boolean killed = false;
- private TrackerDistributedCacheManager trackerDistributerdCacheManager;
+ private TrackerDistributedCacheManager trackerDistributedCacheManager;
private TaskDistributedCacheManager taskDistributedCacheManager;
// Counters summed over all the map/reduce tasks which
@@ -115,14 +115,13 @@ class LocalJobRunner implements JobSubmi
// Manage the distributed cache. If there are files to be copied,
// this will trigger localFile to be re-written again.
- this.trackerDistributerdCacheManager =
- new TrackerDistributedCacheManager(conf, new DefaultTaskController());
+ this.trackerDistributedCacheManager =
+ new TrackerDistributedCacheManager(conf, taskController);
this.taskDistributedCacheManager =
- trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
- taskDistributedCacheManager.setup(
- new LocalDirAllocator("mapred.local.dir"),
- new File(systemJobDir.toString()),
- "archive", "archive");
+ trackerDistributedCacheManager.newTaskDistributedCacheManager(jobid, conf);
+ taskDistributedCacheManager.setupCache(
+ "archive",
+ "archive");
if (DistributedCache.getSymlink(conf)) {
// This is not supported largely because,
@@ -302,7 +301,7 @@ class LocalJobRunner implements JobSubmi
localFs.delete(localJobFile, true); // delete local copy
// Cleanup distributed cache
taskDistributedCacheManager.release();
- trackerDistributerdCacheManager.purgeCache();
+ trackerDistributedCacheManager.purgeCache();
} catch (IOException e) {
LOG.warn("Error cleaning up "+id+": "+e);
}
@@ -395,6 +394,14 @@ class LocalJobRunner implements JobSubmi
return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY,
false);
}
+
+ @Override
+ public void updatePrivateDistributedCacheSizes(
+ org.apache.hadoop.mapreduce.JobID jobId,
+ long[] sizes)
+ throws IOException {
+ trackerDistributedCacheManager.setArchiveSizes(jobId, sizes);
+ }
}
@@ -402,6 +409,7 @@ class LocalJobRunner implements JobSubmi
this.fs = FileSystem.getLocal(conf);
this.conf = conf;
myMetrics = JobTrackerInstrumentation.create(null, new JobConf(conf));
+ taskController.setConf(conf);
}
// JobSubmissionProtocol methods
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri Mar 4 04:43:33 2011
@@ -128,8 +128,10 @@ class MapTask extends Task {
@Override
public TaskRunner createRunner(TaskTracker tracker,
- TaskTracker.TaskInProgress tip) {
- return new MapTaskRunner(tip, tracker, this.conf);
+ TaskTracker.TaskInProgress tip,
+ TaskTracker.RunningJob rjob
+ ) throws IOException {
+ return new MapTaskRunner(tip, tracker, this.conf, rjob);
}
@Override
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java Fri Mar 4 04:43:33 2011
@@ -24,8 +24,10 @@ import org.apache.hadoop.mapred.TaskTrac
/** Runs a map task. */
class MapTaskRunner extends TaskRunner {
- public MapTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf) {
- super(task, tracker, conf);
+ public MapTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf,
+ TaskTracker.RunningJob rjob)
+ throws IOException {
+ super(task, tracker, conf, rjob);
}
/** Delete any temporary files from previous failed attempts. */
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Mar 4 04:43:33 2011
@@ -172,9 +172,10 @@ class ReduceTask extends Task {
}
@Override
- public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip)
- throws IOException {
- return new ReduceTaskRunner(tip, tracker, this.conf);
+ public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip,
+ TaskTracker.RunningJob rjob
+ ) throws IOException {
+ return new ReduceTaskRunner(tip, tracker, this.conf, rjob);
}
@Override
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java Fri Mar 4 04:43:33 2011
@@ -25,9 +25,10 @@ import org.apache.hadoop.mapred.TaskTrac
class ReduceTaskRunner extends TaskRunner {
public ReduceTaskRunner(TaskInProgress task, TaskTracker tracker,
- JobConf conf) throws IOException {
+ JobConf conf, TaskTracker.RunningJob rjob
+ ) throws IOException {
- super(task, tracker, conf);
+ super(task, tracker, conf, rjob);
}
/** Assemble all of the map output files */
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar 4 04:43:33 2011
@@ -451,7 +451,9 @@ abstract public class Task implements Wr
/** Return an approprate thread runner for this task.
* @param tip TODO*/
public abstract TaskRunner createRunner(TaskTracker tracker,
- TaskTracker.TaskInProgress tip) throws IOException;
+ TaskTracker.TaskInProgress tip,
+ TaskTracker.RunningJob rjob
+ ) throws IOException;
/** The number of milliseconds between progress reports. */
public static final int PROGRESS_INTERVAL = 3000;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Mar 4 04:43:33 2011
@@ -17,19 +17,24 @@
*/
package org.apache.hadoop.mapred;
+import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.ProcessTree.Signal;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
/**
@@ -48,360 +53,153 @@ import org.apache.hadoop.util.Shell.Shel
public abstract class TaskController implements Configurable {
private Configuration conf;
-
+
public static final Log LOG = LogFactory.getLog(TaskController.class);
+ //Name of the executable script that will contain the child
+ // JVM command line. See writeCommand for details.
+ protected static final String COMMAND_FILE = "taskjvm.sh";
+
+ protected LocalDirAllocator allocator;
+
+ final public static FsPermission TASK_LAUNCH_SCRIPT_PERMISSION =
+ FsPermission.createImmutable((short) 0700); // rwx--------
+
public Configuration getConf() {
return conf;
}
- // The list of directory paths specified in the variable mapred.local.dir.
- // This is used to determine which among the list of directories is picked up
- // for storing data for a particular task.
- protected String[] mapredLocalDirs;
-
public void setConf(Configuration conf) {
this.conf = conf;
- mapredLocalDirs = conf.getStrings("mapred.local.dir");
}
/**
- * Sets up the permissions of the following directories on all the configured
- * disks:
- * <ul>
- * <li>mapred-local directories</li>
- * <li>Hadoop log directories</li>
- * </ul>
+ * Does initialization and setup.
+ * @param allocator the local dir allocator to use
*/
- public void setup() throws IOException {
- for (String localDir : this.mapredLocalDirs) {
- // Set up the mapred-local directories.
- File mapredlocalDir = new File(localDir);
- if (!mapredlocalDir.exists() && !mapredlocalDir.mkdirs()) {
- LOG.warn("Unable to create mapred-local directory : "
- + mapredlocalDir.getPath());
- } else {
- Localizer.PermissionsHandler.setPermissions(mapredlocalDir,
- Localizer.PermissionsHandler.sevenFiveFive);
- }
- }
-
- // Set up the user log directory
- File taskLog = TaskLog.getUserLogDir();
- if (!taskLog.exists() && !taskLog.mkdirs()) {
- LOG.warn("Unable to create taskLog directory : " + taskLog.getPath());
- } else {
- Localizer.PermissionsHandler.setPermissions(taskLog,
- Localizer.PermissionsHandler.sevenFiveFive);
- }
- }
-
+ public abstract void setup(LocalDirAllocator allocator) throws IOException;
+
/**
- * Take task-controller specific actions to initialize job. This involves
- * setting appropriate permissions to job-files so as to secure the files to
- * be accessible only by the user's tasks.
- *
+ * Create all of the directories necessary for the job to start and download
+ * all of the job and private distributed cache files.
+ * Creates both the user directories and the job log directory.
+ * @param user the user name
+ * @param jobid the job
+ * @param credentials a filename containing the job secrets
+ * @param jobConf the path to the localized configuration file
+ * @param taskTracker the connection the task tracker
* @throws IOException
*/
- abstract void initializeJob(JobInitializationContext context) throws IOException;
-
- /**
- * Take task-controller specific actions to initialize the distributed cache
- * file. This involves setting appropriate permissions for these files so as
- * to secure them to be accessible only their owners.
- *
- * @param context
+ public abstract void initializeJob(String user, String jobid,
+ Path credentials, Path jobConf,
+ TaskUmbilicalProtocol taskTracker)
+ throws IOException;
+
+ /**
+ * Create all of the directories for the task and launches the child jvm.
+ * @param user the user name
+ * @param jobId the jobId in question
+ * @param attemptId the attempt id (cleanup attempts have .cleanup suffix)
+ * @param setup list of shell commands to execute before the jvm
+ * @param jvmArguments list of jvm arguments
+ * @param currentWorkDirectory the full path of the cwd for the task
+ * @param stdout the file to redirect stdout to
+ * @param stderr the file to redirect stderr to
+ * @return the exit code for the task
* @throws IOException
*/
- public abstract void initializeDistributedCacheFile(DistributedCacheFileContext context)
- throws IOException;
-
- /**
- * Launch a task JVM
- *
- * This method defines how a JVM will be launched to run a task. Each
- * task-controller should also do an
- * {@link #initializeTask(TaskControllerContext)} inside this method so as to
- * initialize the task before launching it. This is for reasons of
- * task-controller specific optimizations w.r.t combining initialization and
- * launching of tasks.
- *
- * @param context the context associated to the task
- */
- abstract void launchTaskJVM(TaskControllerContext context)
- throws IOException;
-
- /**
- * Top level cleanup a task JVM method.
- *
- * The current implementation does the following.
- * <ol>
- * <li>Sends a graceful terminate signal to task JVM allowing its sub-process
- * to cleanup.</li>
- * <li>Waits for stipulated period</li>
- * <li>Sends a forceful kill signal to task JVM, terminating all its
- * sub-process forcefully.</li>
- * </ol>
- *
- * @param context the task for which kill signal has to be sent.
- */
- final void destroyTaskJVM(TaskControllerContext context) {
- terminateTask(context);
- try {
- Thread.sleep(context.sleeptimeBeforeSigkill);
- } catch (InterruptedException e) {
- LOG.warn("Sleep interrupted : " +
- StringUtils.stringifyException(e));
- }
- killTask(context);
- }
-
- /** Perform initializing actions required before a task can run.
- *
- * For instance, this method can be used to setup appropriate
- * access permissions for files and directories that will be
- * used by tasks. Tasks use the job cache, log, and distributed cache
- * directories and files as part of their functioning. Typically,
- * these files are shared between the daemon and the tasks
- * themselves. So, a TaskController that is launching tasks
- * as different users can implement this method to setup
- * appropriate ownership and permissions for these directories
- * and files.
- */
- abstract void initializeTask(TaskControllerContext context)
- throws IOException;
-
- /**
- * Contains task information required for the task controller.
+ public abstract
+ int launchTask(String user,
+ String jobId,
+ String attemptId,
+ List<String> setup,
+ List<String> jvmArguments,
+ File currentWorkDirectory,
+ String stdout,
+ String stderr) throws IOException;
+
+ /**
+ * Send a signal to a task pid as the user.
+ * @param user the user name
+ * @param taskPid the pid of the task
+ * @param signal the id of the signal to send
+ */
+ public abstract void signalTask(String user, int taskPid,
+ Signal signal) throws IOException;
+
+ /**
+ * Delete the user's files under all of the task tracker root directories.
+ * @param user the user name
+ * @param subDir the path relative to the user's subdirectory under
+ * the task tracker root directories.
+ * @throws IOException
*/
- static class TaskControllerContext {
- // task being executed
- Task task;
- ShellCommandExecutor shExec; // the Shell executor executing the JVM for this task.
-
- // Information used only when this context is used for launching new tasks.
- JvmEnv env; // the JVM environment for the task.
-
- // Information used only when this context is used for destroying a task jvm.
- String pid; // process handle of task JVM.
- long sleeptimeBeforeSigkill; // waiting time before sending SIGKILL to task JVM after sending SIGTERM
- }
-
+ public abstract void deleteAsUser(String user,
+ String subDir) throws IOException;
+
/**
- * Contains info related to the path of the file/dir to be deleted. This info
- * is needed by task-controller to build the full path of the file/dir
+ * Delete the user's files under the userlogs directory.
+ * @param user the user to work as
+ * @param subDir the path under the userlogs directory.
+ * @throws IOException
*/
- static abstract class TaskControllerPathDeletionContext
- extends PathDeletionContext {
- TaskController taskController;
- String user;
-
- /**
- * mapredLocalDir is the base dir under which to-be-deleted jobLocalDir,
- * taskWorkDir or taskAttemptDir exists. fullPath of jobLocalDir,
- * taskAttemptDir or taskWorkDir is built using mapredLocalDir, jobId,
- * taskId, etc.
- */
- Path mapredLocalDir;
-
- public TaskControllerPathDeletionContext(FileSystem fs, Path mapredLocalDir,
- TaskController taskController,
- String user) {
- super(fs, null);
- this.taskController = taskController;
- this.mapredLocalDir = mapredLocalDir;
+ public abstract void deleteLogAsUser(String user,
+ String subDir) throws IOException;
+
+ static class DeletionContext extends CleanupQueue.PathDeletionContext {
+ private TaskController controller;
+ private boolean isLog;
+ private String user;
+ private String subDir;
+ DeletionContext(TaskController controller, boolean isLog, String user,
+ String subDir) {
+ super(null, null);
+ this.controller = controller;
+ this.isLog = isLog;
this.user = user;
+ this.subDir = subDir;
}
-
- @Override
- protected String getPathForCleanup() {
- if (fullPath == null) {
- fullPath = buildPathForDeletion();
- }
- return fullPath;
- }
-
- /**
- * Return the component of the path under the {@link #mapredLocalDir} to be
- * cleaned up. Its the responsibility of the class that extends
- * {@link TaskControllerPathDeletionContext} to provide the correct
- * component. For example
- * - For task related cleanups, either the task-work-dir or task-local-dir
- * might be returned depending on jvm reuse.
- * - For job related cleanup, simply the job-local-dir might be returned.
- */
- abstract protected String getPath();
-
- /**
- * Builds the path of taskAttemptDir OR taskWorkDir based on
- * mapredLocalDir, jobId, taskId, etc
- */
- String buildPathForDeletion() {
- return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + getPath();
- }
- }
-
- /** Contains info related to the path of the file/dir to be deleted. This info
- * is needed by task-controller to build the full path of the task-work-dir or
- * task-local-dir depending on whether the jvm is reused or not.
- */
- static class TaskControllerTaskPathDeletionContext
- extends TaskControllerPathDeletionContext {
- final Task task;
- final boolean isWorkDir;
- public TaskControllerTaskPathDeletionContext(FileSystem fs,
- Path mapredLocalDir, Task task, boolean isWorkDir,
- TaskController taskController) {
- super(fs, mapredLocalDir, taskController, task.getUser());
- this.task = task;
- this.isWorkDir = isWorkDir;
- }
-
- /**
- * Returns the taskWorkDir or taskLocalDir based on whether
- * {@link TaskControllerTaskPathDeletionContext} is configured to delete
- * the workDir.
- */
- @Override
- protected String getPath() {
- String subDir = (isWorkDir) ? TaskTracker.getTaskWorkDir(task.getUser(),
- task.getJobID().toString(), task.getTaskID().toString(),
- task.isTaskCleanupTask())
- : TaskTracker.getLocalTaskDir(task.getUser(),
- task.getJobID().toString(), task.getTaskID().toString(),
- task.isTaskCleanupTask());
- return subDir;
- }
-
- /**
- * Makes the path(and its subdirectories recursively) fully deletable by
- * setting proper permissions(770) by task-controller
- */
@Override
- protected void enablePathForCleanup() throws IOException {
- getPathForCleanup();// allow init of fullPath, if not inited already
- if (fs.exists(new Path(fullPath))) {
- taskController.enableTaskForCleanup(this);
+ protected void deletePath() throws IOException {
+ if (isLog) {
+ controller.deleteLogAsUser(user, subDir);
+ } else {
+ controller.deleteAsUser(user, subDir);
}
}
}
- /** Contains info related to the path of the file/dir to be deleted. This info
- * is needed by task-controller to build the full path of the job-local-dir.
- */
- static class TaskControllerJobPathDeletionContext
- extends TaskControllerPathDeletionContext {
- final JobID jobId;
-
- public TaskControllerJobPathDeletionContext(FileSystem fs,
- Path mapredLocalDir, JobID id, String user,
- TaskController taskController) {
- super(fs, mapredLocalDir, taskController, user);
- this.jobId = id;
- }
-
- /**
- * Returns the jobLocalDir of the job to be cleaned up.
- */
- @Override
- protected String getPath() {
- return TaskTracker.getLocalJobDir(user, jobId.toString());
- }
-
- /**
- * Makes the path(and its sub-directories recursively) fully deletable by
- * setting proper permissions(770) by task-controller
- */
- @Override
- protected void enablePathForCleanup() throws IOException {
- getPathForCleanup();// allow init of fullPath, if not inited already
- if (fs.exists(new Path(fullPath))) {
- taskController.enableJobForCleanup(this);
+ //Write the JVM command line to a file under the specified directory
+ // Note that the JVM will be launched using a setuid executable, and
+ // could potentially contain strings defined by a user. Hence, to
+ // prevent special character attacks, we write the command line to
+ // a file and execute it.
+ protected static String writeCommand(String cmdLine, FileSystem fs,
+ Path commandFile) throws IOException {
+ PrintWriter pw = null;
+ LOG.info("Writing commands to " + commandFile);
+ try {
+ pw = new PrintWriter(FileSystem.create(
+ fs, commandFile, TASK_LAUNCH_SCRIPT_PERMISSION));
+ pw.write(cmdLine);
+ } catch (IOException ioe) {
+ LOG.error("Caught IOException while writing JVM command line to file. ",
+ ioe);
+ } finally {
+ if (pw != null) {
+ pw.close();
}
}
+ return commandFile.makeQualified(fs).toUri().getPath();
}
- /**
- * NOTE: This class is internal only class and not intended for users!!
- *
- */
- public static class InitializationContext {
- public File workDir;
- public String user;
-
- public InitializationContext() {
- }
-
- public InitializationContext(String user, File workDir) {
- this.user = user;
- this.workDir = workDir;
- }
- }
-
- /**
- * This is used for initializing the private localized files in distributed
- * cache. Initialization would involve changing permission, ownership and etc.
- */
- public static class DistributedCacheFileContext extends InitializationContext {
- // base directory under which file has been localized
- Path localizedBaseDir;
- // the unique string used to construct the localized path
- String uniqueString;
-
- public DistributedCacheFileContext(String user, File workDir,
- Path localizedBaseDir, String uniqueString) {
- super(user, workDir);
- this.localizedBaseDir = localizedBaseDir;
- this.uniqueString = uniqueString;
- }
-
- public Path getLocalizedUniqueDir() {
- return new Path(localizedBaseDir, new Path(TaskTracker
- .getPrivateDistributedCacheDir(user), uniqueString));
+ protected void logOutput(String output) {
+ String shExecOutput = output;
+ if (shExecOutput != null) {
+ for (String str : shExecOutput.split("\n")) {
+ LOG.info(str);
+ }
}
}
-
- static class JobInitializationContext extends InitializationContext {
- JobID jobid;
- }
-
- /**
- * Sends a graceful terminate signal to taskJVM and it sub-processes.
- *
- * @param context task context
- */
- abstract void terminateTask(TaskControllerContext context);
-
- /**
- * Enable the task for cleanup by changing permissions of the path
- * @param context path deletion context
- * @throws IOException
- */
- abstract void enableTaskForCleanup(PathDeletionContext context)
- throws IOException;
- /**
- * Sends a KILL signal to forcefully terminate the taskJVM and its
- * sub-processes.
- *
- * @param context task context
- */
- abstract void killTask(TaskControllerContext context);
-
- /**
- * Initialize user on this TaskTracer in a TaskController specific manner.
- *
- * @param context
- * @throws IOException
- */
- public abstract void initializeUser(InitializationContext context)
- throws IOException;
-
- /**
- * Enable the job for cleanup by changing permissions of the path
- * @param context path deletion context
- * @throws IOException
- */
- abstract void enableJobForCleanup(PathDeletionContext context)
- throws IOException;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Mar 4 04:43:33 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -158,7 +159,14 @@ public class TaskLog {
static File getAttemptDir(TaskAttemptID taskid, boolean isCleanup) {
String cleanupSuffix = isCleanup ? ".cleanup" : "";
- return new File(getJobDir(taskid.getJobID()), taskid + cleanupSuffix);
+ return getAttemptDir(taskid.getJobID().toString(),
+ taskid.toString() + cleanupSuffix);
+ }
+
+ static File getAttemptDir(String jobid, String taskid) {
+ // taskid should be fully formed and it should have the optional
+ // .cleanup suffix
+ return new File(getJobDir(jobid), taskid);
}
static final List<LogName> LOGS_TRACKED_BY_INDEX_FILES =
@@ -232,6 +240,9 @@ public class TaskLog {
}
}
}
+ if (currentTaskid == null) {
+ currentTaskid = taskid;
+ }
// set start and end
for (LogName logName : LOGS_TRACKED_BY_INDEX_FILES) {
if (currentTaskid != taskid) {
@@ -493,7 +504,8 @@ public class TaskLog {
List<String> result = new ArrayList<String>(3);
result.add(bashCommand);
result.add("-c");
- String mergedCmd = buildCommandLine(setup, cmd,
+ String mergedCmd = buildCommandLine(setup,
+ cmd,
stdoutFilename,
stderrFilename, tailLength,
useSetsid);
@@ -511,15 +523,17 @@ public class TaskLog {
String stdout = FileUtil.makeShellPath(stdoutFilename);
String stderr = FileUtil.makeShellPath(stderrFilename);
- StringBuffer mergedCmd = new StringBuffer();
+ StringBuilder mergedCmd = new StringBuilder();
if (!Shell.WINDOWS) {
- mergedCmd.append(" export JVM_PID=`echo $$` ; ");
+ mergedCmd.append("export JVM_PID=`echo $$`\n");
}
- if (setup != null && setup.size() > 0) {
- mergedCmd.append(addCommand(setup, false));
- mergedCmd.append(";");
+ if (setup != null) {
+ for (String s : setup) {
+ mergedCmd.append(s);
+ mergedCmd.append("\n");
+ }
}
if (tailLength > 0) {
mergedCmd.append("(");
@@ -627,11 +641,21 @@ public class TaskLog {
/**
* Get the user log directory for the job jobid.
*
- * @param jobid
+ * @param jobid string representation of the jobid
+ * @return user log directory for the job
+ */
+ public static File getJobDir(String jobid) {
+ return new File(getUserLogDir(), jobid);
+ }
+
+ /**
+ * Get the user log directory for the job jobid.
+ *
+ * @param jobid the jobid object
* @return user log directory for the job
*/
public static File getJobDir(JobID jobid) {
- return new File(getUserLogDir(), jobid.toString());
+ return getJobDir(jobid.toString());
}
} // TaskLog
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Fri Mar 4 04:43:33 2011
@@ -18,7 +18,6 @@
package org.apache.hadoop.mapred;
-import java.io.File;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -184,13 +183,8 @@ class TaskMemoryManagerThread extends Th
// itself is still retained in runningTasks till successful
// transmission to JT
- // create process tree object
- long sleeptimeBeforeSigkill = taskTracker.getJobConf().getLong(
- "mapred.tasktracker.tasks.sleeptime-before-sigkill",
- ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
-
- ProcfsBasedProcessTree pt = new ProcfsBasedProcessTree(
- pId,ProcessTree.isSetsidAvailable, sleeptimeBeforeSigkill);
+ ProcfsBasedProcessTree pt =
+ new ProcfsBasedProcessTree(pId, ProcessTree.isSetsidAvailable);
LOG.debug("Tracking ProcessTree " + pId + " for the first time");
ptInfo.setPid(pId);
@@ -228,10 +222,9 @@ class TaskMemoryManagerThread extends Th
+ "bytes. Killing task. \nDump of the process-tree for "
+ tid + " : \n" + pTree.getProcessTreeDump();
LOG.warn(msg);
+ // kill the task
taskTracker.cleanUpOverMemoryTask(tid, true, msg);
- // Now destroy the ProcessTree, remove it from monitoring map.
- pTree.destroy(true/*in the background*/);
it.remove();
LOG.info("Removed ProcessTree with root " + pId);
} else {
@@ -365,8 +358,6 @@ class TaskMemoryManagerThread extends Th
taskTracker.cleanUpOverMemoryTask(tid, false, msg);
// Now destroy the ProcessTree, remove it from monitoring map.
ProcessTreeInfo ptInfo = processTreeInfoMap.get(tid);
- ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
- pTree.destroy(true/*in the background*/);
processTreeInfoMap.remove(tid);
LOG.info("Removed ProcessTree with root " + ptInfo.getPID());
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar 4 04:43:33 2011
@@ -20,16 +20,16 @@ package org.apache.hadoop.mapred;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
-import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.net.URI;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Vector;
+import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,7 +46,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.mapreduce.JobContext;
/** Base class that runs a task in a separate process. Tasks are run in a
* separate process in order to isolate the map/reduce system code from bugs in
@@ -74,6 +73,8 @@ abstract class TaskRunner extends Thread
static final String DEFAULT_HOME_DIR= "/homes/";
+ static final String HADOOP_WORK_DIR = "HADOOP_WORK_DIR";
+
static final String MAPRED_ADMIN_USER_ENV =
"mapreduce.admin.user.env";
@@ -88,13 +89,19 @@ abstract class TaskRunner extends Thread
private int exitCode = -1;
private boolean exitCodeSet = false;
- private static String SYSTEM_PATH_SEPARATOR = System.getProperty("path.separator");
+ private static String SYSTEM_PATH_SEPARATOR =
+ System.getProperty("path.separator");
static final String MAPREDUCE_USER_CLASSPATH_FIRST =
"mapreduce.user.classpath.first"; //a semi-hidden config
private TaskTracker tracker;
- private TaskDistributedCacheManager taskDistributedCacheManager;
+ private final TaskDistributedCacheManager taskDistributedCacheManager;
+ private String[] localdirs;
+ final private static Random rand;
+ static {
+ rand = new Random();
+ }
protected JobConf conf;
JvmManager jvmManager;
@@ -105,7 +112,8 @@ abstract class TaskRunner extends Thread
protected MapOutputFile mapOutputFile;
public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker,
- JobConf conf) {
+ JobConf conf, TaskTracker.RunningJob rjob
+ ) throws IOException {
this.tip = tip;
this.t = tip.getTask();
this.tracker = tracker;
@@ -113,6 +121,8 @@ abstract class TaskRunner extends Thread
this.mapOutputFile = new MapOutputFile();
this.mapOutputFile.setConf(conf);
this.jvmManager = tracker.getJvmManagerInstance();
+ this.localdirs = conf.getLocalDirs();
+ taskDistributedCacheManager = rjob.distCacheMgr;
}
public Task getTask() { return t; }
@@ -182,27 +192,20 @@ abstract class TaskRunner extends Thread
//all the archives
TaskAttemptID taskid = t.getTaskID();
final LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
- final File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
+ //simply get the location of the workDir and pass it to the child. The
+ //child will do the actual dir creation
+ final File workDir =
+ new File(new Path(localdirs[rand.nextInt(localdirs.length)],
+ TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(),
+ taskid.toString(),
+ t.isTaskCleanupTask())).toString());
- // We don't create any symlinks yet, so presence/absence of workDir
- // actually on the file system doesn't matter.
String user = tip.getUGI().getUserName();
- tip.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
- public Void run() throws IOException {
- taskDistributedCacheManager =
- tracker.getTrackerDistributedCacheManager()
- .newTaskDistributedCacheManager(conf);
- taskDistributedCacheManager.setup(lDirAlloc, workDir, TaskTracker
- .getPrivateDistributedCacheDir(conf.getUser()),
- TaskTracker.getPublicDistributedCacheDir());
- return null;
- }
- });
// Set up the child task's configuration. After this call, no localization
// of files should happen in the TaskTracker's process space. Any changes to
// the conf object after this will NOT be reflected to the child.
- setupChildTaskConfiguration(lDirAlloc);
+ // setupChildTaskConfiguration(lDirAlloc);
if (!prepare()) {
return;
@@ -220,7 +223,7 @@ abstract class TaskRunner extends Thread
tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
// set memory limit using ulimit if feasible and necessary ...
- List<String> setup = getVMSetupCmd();
+ String setup = getVMSetupCmd();
// Set up the redirection of the task's stdout and stderr streams
File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());
File stdout = logFiles[0];
@@ -231,8 +234,21 @@ abstract class TaskRunner extends Thread
Map<String, String> env = new HashMap<String, String>();
errorInfo = getVMEnvironment(errorInfo, user, workDir, conf, env, taskid,
logSize);
-
- launchJvmAndWait(setup, vargs, stdout, stderr, logSize, workDir, env);
+
+ // flatten the env as a set of export commands
+ List <String> setupCmds = new ArrayList<String>();
+ for(Entry<String, String> entry : env.entrySet()) {
+ StringBuffer sb = new StringBuffer();
+ sb.append("export ");
+ sb.append(entry.getKey());
+ sb.append("=\"");
+ sb.append(entry.getValue());
+ sb.append("\"");
+ setupCmds.add(sb.toString());
+ }
+ setupCmds.add(setup);
+
+ launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);
tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
if (exitCodeSet) {
if (!killed && exitCode != 0) {
@@ -261,13 +277,6 @@ abstract class TaskRunner extends Thread
LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
}
} finally {
- try{
- if (taskDistributedCacheManager != null) {
- taskDistributedCacheManager.release();
- }
- }catch(IOException ie){
- LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
- }
// It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with
// *false* since the task has either
@@ -277,11 +286,11 @@ abstract class TaskRunner extends Thread
}
}
- void launchJvmAndWait(List<String> setup, Vector<String> vargs, File stdout,
- File stderr, long logSize, File workDir, Map<String, String> env)
- throws InterruptedException {
+ void launchJvmAndWait(List <String> setup, Vector<String> vargs, File stdout,
+ File stderr, long logSize, File workDir)
+ throws InterruptedException, IOException {
jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup, vargs, stdout,
- stderr, logSize, workDir, env, conf));
+ stderr, logSize, workDir, conf));
synchronized (lock) {
while (!done) {
lock.wait();
@@ -332,7 +341,7 @@ abstract class TaskRunner extends Thread
.isTaskCleanupTask()), conf);
// write the child's task configuration file to the local disk
- writeLocalTaskFile(localTaskFile.toString(), conf);
+ JobLocalizer.writeLocalJobFile(localTaskFile, conf);
// Set the final job file in the task. The child needs to know the correct
// path to job.xml. So set this path accordingly.
@@ -342,16 +351,21 @@ abstract class TaskRunner extends Thread
/**
* @return
*/
- private List<String> getVMSetupCmd() {
- String[] ulimitCmd = Shell.getUlimitMemoryCommand(getChildUlimit(conf));
- List<String> setup = null;
- if (ulimitCmd != null) {
- setup = new ArrayList<String>();
- for (String arg : ulimitCmd) {
- setup.add(arg);
- }
+ private String getVMSetupCmd() {
+ final int ulimit = getChildUlimit(conf);
+ if (ulimit <= 0) {
+ return "";
+ }
+ String setup[] = Shell.getUlimitMemoryCommand(ulimit);
+ StringBuilder command = new StringBuilder();
+ for (String str : setup) {
+ command.append('\'');
+ command.append(str);
+ command.append('\'');
+ command.append(" ");
}
- return setup;
+ command.append("\n");
+ return command.toString();
}
/**
@@ -434,7 +448,7 @@ abstract class TaskRunner extends Thread
vargs.add(javaOptsSplit[i]);
}
- Path childTmpDir = createChildTmpDir(workDir, conf);
+ Path childTmpDir = createChildTmpDir(workDir, conf, false);
vargs.add("-Djava.io.tmpdir=" + childTmpDir);
// Add classpath.
@@ -483,7 +497,7 @@ abstract class TaskRunner extends Thread
* @throws IOException
*/
static Path createChildTmpDir(File workDir,
- JobConf conf)
+ JobConf conf, boolean createDir)
throws IOException {
// add java.io.tmpdir given by mapred.child.tmp
@@ -493,10 +507,13 @@ abstract class TaskRunner extends Thread
// if temp directory path is not absolute, prepend it with workDir.
if (!tmpDir.isAbsolute()) {
tmpDir = new Path(workDir.toString(), tmp);
-
- FileSystem localFs = FileSystem.getLocal(conf);
- if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
- throw new IOException("Mkdirs failed to create " + tmpDir.toString());
+ if (createDir) {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ if (!localFs.mkdirs(tmpDir) &&
+ !localFs.getFileStatus(tmpDir).isDir()) {
+ throw new IOException("Mkdirs failed to create " +
+ tmpDir.toString());
+ }
}
}
return tmpDir;
@@ -535,9 +552,10 @@ abstract class TaskRunner extends Thread
return classPaths;
}
- private String getVMEnvironment(String errorInfo, String user, File workDir, JobConf conf,
- Map<String, String> env, TaskAttemptID taskid, long logSize)
- throws Throwable {
+ private String getVMEnvironment(String errorInfo, String user, File workDir,
+ JobConf conf, Map<String, String> env,
+ TaskAttemptID taskid, long logSize
+ ) throws Throwable {
StringBuffer ldLibraryPath = new StringBuffer();
ldLibraryPath.append(workDir.toString());
String oldLdLibraryPath = null;
@@ -547,11 +565,13 @@ abstract class TaskRunner extends Thread
ldLibraryPath.append(oldLdLibraryPath);
}
env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+ env.put(HADOOP_WORK_DIR, workDir.toString());
//update user configured login-shell properties
updateUserLoginEnv(errorInfo, user, conf, env);
+ // put jobTokenFile name into env
String jobTokenFile = conf.get(TokenCache.JOB_TOKENS_FILENAME);
- LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
- env.put(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, jobTokenFile);
+ LOG.debug("putting jobToken file name into environment " + jobTokenFile);
+ env.put(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, jobTokenFile);
// for the child of task jvm, set hadoop.root.logger
env.put("HADOOP_ROOT_LOGGER","INFO,TLA");
String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
@@ -560,9 +580,9 @@ abstract class TaskRunner extends Thread
} else {
hadoopClientOpts = hadoopClientOpts + " ";
}
- hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid
- + " -Dhadoop.tasklog.iscleanup=" + t.isTaskCleanupTask()
- + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
+ hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" +
+ taskid + " -Dhadoop.tasklog.iscleanup=" + t.isTaskCleanupTask() +
+ " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
// following line is a backport from jira MAPREDUCE-1286
env.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
@@ -639,25 +659,6 @@ abstract class TaskRunner extends Thread
}
/**
- * Write the task specific job-configuration file.
- *
- * @param localFs
- * @throws IOException
- */
- private static void writeLocalTaskFile(String jobFile, JobConf conf)
- throws IOException {
- Path localTaskFile = new Path(jobFile);
- FileSystem localFs = FileSystem.getLocal(conf);
- localFs.delete(localTaskFile, true);
- OutputStream out = localFs.create(localTaskFile);
- try {
- conf.writeXml(out);
- } finally {
- out.close();
- }
- }
-
- /**
* Prepare the mapred.local.dir for the child. The child is sand-boxed now.
* Whenever it uses LocalDirAllocator from now on inside the child, it will
* only see files inside the attempt-directory. This is done in the Child's
@@ -681,15 +682,11 @@ abstract class TaskRunner extends Thread
}
/** Creates the working directory pathname for a task attempt. */
- static File formWorkDir(LocalDirAllocator lDirAlloc,
- TaskAttemptID task, boolean isCleanup, JobConf conf)
+ static Path formWorkDir(LocalDirAllocator lDirAlloc, JobConf conf)
throws IOException {
Path workDir =
- lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
- conf.getUser(), task.getJobID().toString(), task.toString(),
- isCleanup), conf);
-
- return new File(workDir.toString());
+ lDirAlloc.getLocalPathToRead(MRConstants.WORKDIR, conf);
+ return workDir;
}
private static void appendSystemClasspaths(List<String> classPaths) {
@@ -780,7 +777,7 @@ abstract class TaskRunner extends Thread
}
}
- createChildTmpDir(workDir, conf);
+ createChildTmpDir(workDir, conf, true);
}
/**
@@ -804,8 +801,10 @@ abstract class TaskRunner extends Thread
/**
* Kill the child process
+ * @throws InterruptedException
+ * @throws IOException
*/
- public void kill() {
+ public void kill() throws IOException, InterruptedException {
killed = true;
jvmManager.taskKilled(this);
signalDone();